http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java 
b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
new file mode 100644
index 0000000..1677fed
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
@@ -0,0 +1,287 @@
+package mvm.rya.indexing;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.joda.time.DateTime;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.QueryModelVisitor;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+
+//Indexing Node for temporal expressions to be inserted into execution plan
+//to delegate temporal portion of query to temporal index
+public class TemporalTupleSet extends ExternalTupleSet {
+
+    private final Configuration conf;
+    private final TemporalIndexer temporalIndexer;
+    private final IndexingExpr filterInfo;
+
+    public TemporalTupleSet(final IndexingExpr filterInfo, final 
TemporalIndexer temporalIndexer) {
+        this.filterInfo = filterInfo;
+        this.temporalIndexer = temporalIndexer;
+        conf = temporalIndexer.getConf();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Set<String> getBindingNames() {
+        return filterInfo.getBindingNames();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * Note that we need a deep copy for everything that (during optimizations)
+     * can be altered via {@link #visitChildren(QueryModelVisitor)}
+     */
+    @Override
+    public TemporalTupleSet clone() {
+        return new TemporalTupleSet(filterInfo, temporalIndexer);
+    }
+
+    @Override
+    public double cardinality() {
+        return 0.0; // No idea how the estimate cardinality here.
+    }
+
+    @Override
+    public String getSignature() {
+
+        return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", 
").join(getBindingNames()).replaceAll("\\s+", " ");
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+        if (other == this) {
+            return true;
+        }
+        if (!(other instanceof TemporalTupleSet)) {
+            return false;
+        }
+        final TemporalTupleSet arg = (TemporalTupleSet) other;
+        return filterInfo.equals(arg.filterInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = 17;
+        result = 31*result + filterInfo.hashCode();
+
+        return result;
+    }
+
+    /**
+     * Returns an iterator over the result set associated with contained 
IndexingExpr.
+     * <p>
+     * Should be thread-safe (concurrent invocation {@link OfflineIterable} 
this
+     * method can be expected with some query evaluators.
+     */
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
evaluate(final BindingSet bindings)
+            throws QueryEvaluationException {
+        final URI funcURI = filterInfo.getFunction();
+        final SearchFunction searchFunction = (new 
TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI);
+
+        if(filterInfo.getArguments().length > 1) {
+            throw new IllegalArgumentException("Index functions do not support 
more than two arguments.");
+        }
+
+        final String queryText = filterInfo.getArguments()[0].stringValue();
+        return IteratorFactory.getIterator(filterInfo.getSpConstraint(), 
bindings, queryText, searchFunction);
+    }
+
+    //returns appropriate search function for a given URI
+    //search functions used by TemporalIndexer to query Temporal Index
+    private class TemporalSearchFunctionFactory  {
+        private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = 
Maps.newHashMap();
+        Configuration conf;
+
+        public TemporalSearchFunctionFactory(final Configuration conf) {
+            this.conf = conf;
+        }
+
+        /**
+         * Get a {@link TemporalSearchFunction} for a give URI.
+         *
+         * @param searchFunction
+         * @return
+         */
+        public SearchFunction getSearchFunction(final URI searchFunction) {
+            SearchFunction geoFunc = null;
+            try {
+                geoFunc = getSearchFunctionInternal(searchFunction);
+            } catch (final QueryEvaluationException e) {
+                e.printStackTrace();
+            }
+
+            return geoFunc;
+        }
+
+        private SearchFunction getSearchFunctionInternal(final URI 
searchFunction) throws QueryEvaluationException {
+            final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction);
+
+            if (sf != null) {
+                return sf;
+            } else {
+                throw new QueryEvaluationException("Unknown Search Function: " 
+ searchFunction.stringValue());
+            }
+        }
+
+        private final SearchFunction TEMPORAL_InstantAfterInstant = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInstant queryInstant = new 
TemporalInstantRfc3339(DateTime.parse(searchTerms));
+                return temporalIndexer.queryInstantAfterInstant(queryInstant, 
contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantAfterInstant";
+            };
+        };
+        private final SearchFunction TEMPORAL_InstantBeforeInstant = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInstant queryInstant = new 
TemporalInstantRfc3339(DateTime.parse(searchTerms));
+                return temporalIndexer.queryInstantBeforeInstant(queryInstant, 
contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantBeforeInstant";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantEqualsInstant = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInstant queryInstant = new 
TemporalInstantRfc3339(DateTime.parse(searchTerms));
+                return temporalIndexer.queryInstantEqualsInstant(queryInstant, 
contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantEqualsInstant";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantAfterInterval = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInterval queryInterval = 
TemporalInstantRfc3339.parseInterval(searchTerms);
+                return 
temporalIndexer.queryInstantAfterInterval(queryInterval, contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantAfterInterval";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantBeforeInterval = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInterval queryInterval = 
TemporalInstantRfc3339.parseInterval(searchTerms);
+                return 
temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantBeforeInterval";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantInsideInterval = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInterval queryInterval = 
TemporalInstantRfc3339.parseInterval(searchTerms);
+                return 
temporalIndexer.queryInstantInsideInterval(queryInterval, contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantInsideInterval";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantHasBeginningInterval = 
new SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInterval queryInterval = 
TemporalInstantRfc3339.parseInterval(searchTerms);
+                return 
temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantHasBeginningInterval";
+            };
+        };
+
+        private final SearchFunction TEMPORAL_InstantHasEndInterval = new 
SearchFunction() {
+            @Override
+            public CloseableIteration<Statement, QueryEvaluationException> 
performSearch(final String searchTerms,
+                    final StatementConstraints contraints) throws 
QueryEvaluationException {
+                final TemporalInterval queryInterval = 
TemporalInstantRfc3339.parseInterval(searchTerms);
+                return 
temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints);
+            }
+
+            @Override
+            public String toString() {
+                return "TEMPORAL_InstantHasEndInterval";
+            };
+        };
+
+        {
+            final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#";
+
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), 
TEMPORAL_InstantAfterInstant);
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), 
TEMPORAL_InstantBeforeInstant);
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), 
TEMPORAL_InstantEqualsInstant);
+
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), 
TEMPORAL_InstantBeforeInterval);
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), 
TEMPORAL_InstantAfterInterval);
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), 
TEMPORAL_InstantInsideInterval);
+            SEARCH_FUNCTION_MAP.put(new 
URIImpl(TEMPORAL_NS+"hasBeginningInterval"),
+                    TEMPORAL_InstantHasBeginningInterval);
+            SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), 
TEMPORAL_InstantHasEndInterval);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
new file mode 100644
index 0000000..7c608de
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -0,0 +1,418 @@
+package mvm.rya.indexing.accumulo;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.URIImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.indexing.FilterFunctionOptimizer;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.entity.EntityOptimizer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer;
+import mvm.rya.indexing.accumulo.freetext.Tokenizer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import mvm.rya.indexing.external.PrecomputedJoinIndexer;
+import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer;
+import mvm.rya.indexing.pcj.matching.PCJOptimizer;
+
+/**
+ * A set of configuration utils to read a Hadoop {@link Configuration} object 
and create Cloudbase/Accumulo objects.
+ * Soon will deprecate this class.  Use installer for the set methods, use 
{@link RyaDetails} for the get methods. 
+ * New code must separate parameters that are set at Rya install time from 
that which is specific to the client.
+ * Also Accumulo index tables are pushed down to the implementation and not 
configured in conf.   
+ */
+public class ConfigUtils {
+    private static final Logger logger = Logger.getLogger(ConfigUtils.class);
+
+    public static final String CLOUDBASE_TBL_PREFIX = 
"sc.cloudbase.tableprefix";
+    public static final String CLOUDBASE_AUTHS = "sc.cloudbase.authorizations";
+    public static final String CLOUDBASE_INSTANCE = 
"sc.cloudbase.instancename";
+    public static final String CLOUDBASE_ZOOKEEPERS = 
"sc.cloudbase.zookeepers";
+    public static final String CLOUDBASE_USER = "sc.cloudbase.username";
+    public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";
+
+    public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = 
"sc.cloudbase.writer.maxwritethreads";
+    public static final String CLOUDBASE_WRITER_MAX_LATENCY = 
"sc.cloudbase.writer.maxlatency";
+    public static final String CLOUDBASE_WRITER_MAX_MEMORY = 
"sc.cloudbase.writer.maxmemory";
+
+    public static final String FREE_TEXT_QUERY_TERM_LIMIT = 
"sc.freetext.querytermlimit";
+
+    public static final String USE_FREETEXT = "sc.use_freetext";
+    public static final String USE_TEMPORAL = "sc.use_temporal";
+    public static final String USE_ENTITY = "sc.use_entity";
+    public static final String USE_PCJ = "sc.use_pcj";
+    public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj";
+    public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
+
+    public static final String FLUO_APP_NAME = 
"rya.indexing.pcj.fluo.fluoAppName";
+    public static final String USE_PCJ_FLUO_UPDATER = 
"rya.indexing.pcj.updater.fluo";
+    public static final String PCJ_STORAGE_TYPE = 
"rya.indexing.pcj.storageType";
+    public static final String PCJ_UPDATER_TYPE = 
"rya.indexing.pcj.updaterType";
+
+
+    public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail";
+    public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail";
+
+    public static final String USE_MOCK_INSTANCE = ".useMockInstance";
+
+    public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions";
+
+    private static final int WRITER_MAX_WRITE_THREADS = 1;
+    private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE;
+    private static final long WRITER_MAX_MEMORY = 10000L;
+
+    public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan";
+
+    public static final String FREETEXT_PREDICATES_LIST = 
"sc.freetext.predicates";
+    public static final String FREETEXT_DOC_NUM_PARTITIONS = 
"sc.freetext.numPartitions.text";
+    public static final String FREETEXT_TERM_NUM_PARTITIONS = 
"sc.freetext.numPartitions.term";
+
+    public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class";
+
+    public static final String GEO_PREDICATES_LIST = "sc.geo.predicates";
+
+    public static final String TEMPORAL_PREDICATES_LIST = 
"sc.temporal.predicates";
+
+    public static final String USE_MONGO = "sc.useMongo";
+
+    public static boolean isDisplayQueryPlan(final Configuration conf){
+        return conf.getBoolean(DISPLAY_QUERY_PLAN, false);
+    }
+
+    /**
+     * get a value from the configuration file and throw an exception if the 
value does not exist.
+     *
+     * @param conf
+     * @param key
+     * @return
+     */
+    private static String getStringCheckSet(final Configuration conf, final 
String key) {
+        final String value = conf.get(key);
+        requireNonNull(value, key + " not set");
+        return value;
+    }
+
+    /**
+     * @param conf
+     * @param tablename
+     * @return if the table was created
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws TableExistsException
+     */
+    public static boolean createTableIfNotExists(final Configuration conf, 
final String tablename) throws AccumuloException, AccumuloSecurityException,
+            TableExistsException {
+        final TableOperations tops = getConnector(conf).tableOperations();
+        if (!tops.exists(tablename)) {
+            logger.info("Creating table: " + tablename);
+            tops.create(tablename);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Lookup the table name prefix in the conf and throw an error if it is 
null.
+     * Future,  get table prefix from RyaDetails -- the Rya instance name
+     *  -- also getting info from the RyaDetails should happen within 
RyaSailFactory and not ConfigUtils.
+     * @param conf  Rya configuration map where it extracts the prefix 
(instance name)
+     * @return  index table prefix corresponding to this Rya instance
+     */
+    public static String getTablePrefix(final Configuration conf) {
+        final String tablePrefix;
+        tablePrefix = 
conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
+        requireNonNull(tablePrefix, "Configuration key: " + 
RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX
+                + " not set.  Cannot generate table name.");
+        return tablePrefix;
+    }
+
+    public static int getFreeTextTermLimit(final Configuration conf) {
+        return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100);
+    }
+
+    public static Set<URI> getFreeTextPredicates(final Configuration conf) {
+        return getPredicates(conf, FREETEXT_PREDICATES_LIST);
+    }
+
+    public static Set<URI> getGeoPredicates(final Configuration conf) {
+        return getPredicates(conf, GEO_PREDICATES_LIST);
+    }
+    /**
+     * Used for indexing statements about date & time instances and intervals.
+     * @param conf
+     * @return Set of predicate URI's whose objects should be date time 
literals.
+     */
+    public static Set<URI> getTemporalPredicates(final Configuration conf) {
+        return getPredicates(conf, TEMPORAL_PREDICATES_LIST);
+    }
+
+    protected static Set<URI> getPredicates(final Configuration conf, final 
String confName) {
+        final String[] validPredicateStrings = conf.getStrings(confName, new 
String[] {});
+        final Set<URI> predicates = new HashSet<URI>();
+        for (final String prediateString : validPredicateStrings) {
+            predicates.add(new URIImpl(prediateString));
+        }
+        return predicates;
+    }
+
+    public static Tokenizer getFreeTextTokenizer(final Configuration conf) {
+        final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, 
LuceneTokenizer.class, Tokenizer.class);
+        return ReflectionUtils.newInstance(c, conf);
+    }
+
+    public static BatchWriter createDefaultBatchWriter(final String tablename, 
final Configuration conf) throws TableNotFoundException,
+            AccumuloException, AccumuloSecurityException {
+        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+        final Integer DEFAULT_MAX_WRITE_THREADS = 
getWriterMaxWriteThreads(conf);
+        final Connector connector = ConfigUtils.getConnector(conf);
+        return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, 
DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+    }
+
+    public static MultiTableBatchWriter createMultitableBatchWriter(final 
Configuration conf) throws AccumuloException, AccumuloSecurityException {
+        final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf);
+        final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf);
+        final Integer DEFAULT_MAX_WRITE_THREADS = 
getWriterMaxWriteThreads(conf);
+        final Connector connector = ConfigUtils.getConnector(conf);
+        return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, 
DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS);
+    }
+
+    public static Scanner createScanner(final String tablename, final 
Configuration conf) throws AccumuloException, AccumuloSecurityException,
+            TableNotFoundException {
+        final Connector connector = ConfigUtils.getConnector(conf);
+        final Authorizations auths = ConfigUtils.getAuthorizations(conf);
+        return connector.createScanner(tablename, auths);
+
+    }
+
+       public static BatchScanner createBatchScanner(final String tablename, 
final Configuration conf) throws AccumuloException, AccumuloSecurityException,
+                       TableNotFoundException {
+               final Connector connector = ConfigUtils.getConnector(conf);
+               final Authorizations auths = 
ConfigUtils.getAuthorizations(conf);
+               Integer numThreads = null;
+               if (conf instanceof RdfCloudTripleStoreConfiguration) {
+                       numThreads = ((RdfCloudTripleStoreConfiguration) 
conf).getNumThreads();
+               } else {
+                       numThreads = 
conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2);
+               }
+               return connector.createBatchScanner(tablename, auths, 
numThreads);
+       }
+
+    public static int getWriterMaxWriteThreads(final Configuration conf) {
+        return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, 
WRITER_MAX_WRITE_THREADS);
+    }
+
+    public static long getWriterMaxLatency(final Configuration conf) {
+        return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY);
+    }
+
+    public static long getWriterMaxMemory(final Configuration conf) {
+        return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY);
+    }
+
+    public static String getUsername(final JobContext job) {
+        return getUsername(job.getConfiguration());
+    }
+
+    public static String getUsername(final Configuration conf) {
+        return conf.get(CLOUDBASE_USER);
+    }
+
+    public static Authorizations getAuthorizations(final JobContext job) {
+        return getAuthorizations(job.getConfiguration());
+    }
+
+    public static Authorizations getAuthorizations(final Configuration conf) {
+        final String authString = conf.get(CLOUDBASE_AUTHS, "");
+        if (authString.isEmpty()) {
+            return new Authorizations();
+        }
+        return new Authorizations(authString.split(","));
+    }
+
+    public static Instance getInstance(final JobContext job) {
+        return getInstance(job.getConfiguration());
+    }
+
+    public static Instance getInstance(final Configuration conf) {
+        if (useMockInstance(conf)) {
+            return new MockInstance(conf.get(CLOUDBASE_INSTANCE));
+        }
+        return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), 
conf.get(CLOUDBASE_ZOOKEEPERS));
+    }
+
+    public static String getPassword(final JobContext job) {
+        return getPassword(job.getConfiguration());
+    }
+
+    public static String getPassword(final Configuration conf) {
+        return conf.get(CLOUDBASE_PASSWORD, "");
+    }
+
+    public static Connector getConnector(final JobContext job) throws 
AccumuloException, AccumuloSecurityException {
+        return getConnector(job.getConfiguration());
+    }
+
+    public static Connector getConnector(final Configuration conf) throws 
AccumuloException, AccumuloSecurityException {
+        final Instance instance = ConfigUtils.getInstance(conf);
+
+        return instance.getConnector(getUsername(conf), getPassword(conf));
+    }
+
+    public static boolean useMockInstance(final Configuration conf) {
+        return conf.getBoolean(USE_MOCK_INSTANCE, false);
+    }
+
+    protected static int getNumPartitions(final Configuration conf) {
+        return conf.getInt(NUM_PARTITIONS, 25);
+    }
+
+    public static int getFreeTextDocNumPartitions(final Configuration conf) {
+        return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, 
getNumPartitions(conf));
+    }
+
+    public static int getFreeTextTermNumPartitions(final Configuration conf) {
+        return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, 
getNumPartitions(conf));
+    }
+
+     public static boolean getUseFreeText(final Configuration conf) {
+        return conf.getBoolean(USE_FREETEXT, false);
+    }
+
+    public static boolean getUseTemporal(final Configuration conf) {
+        return conf.getBoolean(USE_TEMPORAL, false);
+    }
+
+    public static boolean getUseEntity(final Configuration conf) {
+        return conf.getBoolean(USE_ENTITY, false);
+    }
+
+    public static boolean getUsePCJ(final Configuration conf) {
+        return conf.getBoolean(USE_PCJ, false);
+    }
+
+    public static boolean getUseOptimalPCJ(final Configuration conf) {
+        return conf.getBoolean(USE_OPTIMAL_PCJ, false);
+    }
+    
+    public static boolean getUsePcjUpdaterIndex(final Configuration conf) {
+        return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
+    }
+
+
+    /**
+     * @return The name of the Fluo Application this instance of RYA is
+     *   using to incrementally update PCJs.
+     */
+    //TODO delete this eventually and use Details table
+    public Optional<String> getFluoAppName(Configuration conf) {
+        return Optional.fromNullable(conf.get(FLUO_APP_NAME));
+    }
+
+
+    public static boolean getUseMongo(final Configuration conf) {
+        return conf.getBoolean(USE_MONGO, false);
+    }
+
+
+    public static void setIndexers(final RdfCloudTripleStoreConfiguration 
conf) {
+
+        final List<String> indexList = Lists.newArrayList();
+        final List<String> optimizers = Lists.newArrayList();
+
+        boolean useFilterIndex = false;
+
+        if (ConfigUtils.getUseMongo(conf)) {
+             if (getUseFreeText(conf)) {
+                indexList.add(MongoFreeTextIndexer.class.getName());
+                useFilterIndex = true;
+            }
+        } else {
+
+                if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
+                 conf.setPcjOptimizer(PCJOptimizer.class);
+             }
+             
+             if(getUsePcjUpdaterIndex(conf)) {
+               indexList.add(PrecomputedJoinIndexer.class.getName());
+             }
+
+
+            if (getUseFreeText(conf)) {
+                indexList.add(AccumuloFreeTextIndexer.class.getName());
+                useFilterIndex = true;
+            }
+
+            if (getUseTemporal(conf)) {
+                indexList.add(AccumuloTemporalIndexer.class.getName());
+                useFilterIndex = true;
+            }
+
+        }
+
+        if (useFilterIndex) {
+            optimizers.add(FilterFunctionOptimizer.class.getName());
+        }
+
+        if (getUseEntity(conf)) {
+            indexList.add(EntityCentricIndex.class.getName());
+            optimizers.add(EntityOptimizer.class.getName());
+
+        }
+
+        conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, 
indexList.toArray(new String[]{}));
+        conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, 
optimizers.toArray(new String[]{}));
+
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
new file mode 100644
index 0000000..4c1a3ad
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java
@@ -0,0 +1,443 @@
+package mvm.rya.indexing.accumulo.entity;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE;
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil;
+import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.DocIdIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Bytes;
+
+public class AccumuloDocIdIndexer implements DocIdIndexer {
+
+
+
+    private BatchScanner bs;
+    private AccumuloRdfConfiguration conf;
+
+    public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws 
AccumuloException, AccumuloSecurityException {
+        Preconditions.checkArgument(conf instanceof 
RdfCloudTripleStoreConfiguration, "conf must be isntance of 
RdfCloudTripleStoreConfiguration");
+        this.conf = (AccumuloRdfConfiguration) conf;
+        //Connector conn = ConfigUtils.getConnector(conf);
+    }
+
+
+
+
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
queryDocIndex(String sparqlQuery,
+            Collection<BindingSet> constraints) throws TableNotFoundException, 
QueryEvaluationException {
+
+        SPARQLParser parser = new SPARQLParser();
+        ParsedQuery pq1 = null;
+        try {
+            pq1 = parser.parseQuery(sparqlQuery, null);
+        } catch (MalformedQueryException e) {
+            e.printStackTrace();
+        }
+
+        TupleExpr te1 = pq1.getTupleExpr();
+        List<StatementPattern> spList1 = 
StatementPatternCollector.process(te1);
+
+        if(StarQuery.isValidStarQuery(spList1)) {
+            StarQuery sq1 = new StarQuery(spList1);
+            return queryDocIndex(sq1, constraints);
+        } else {
+            throw new IllegalArgumentException("Invalid star query!");
+        }
+
+    }
+
+
+
+
+    @Override
+    public CloseableIteration<BindingSet, QueryEvaluationException> 
queryDocIndex(StarQuery query,
+            Collection<BindingSet> constraints) throws TableNotFoundException, 
QueryEvaluationException {
+
+        final StarQuery starQ = query;
+        final Iterator<BindingSet> bs = constraints.iterator();
+        final Iterator<BindingSet> bs2 = constraints.iterator();
+        final Set<String> unCommonVarNames;
+        final Set<String> commonVarNames;
+        if (bs2.hasNext()) {
+            BindingSet currBs = bs2.next();
+            commonVarNames = StarQuery.getCommonVars(query, currBs);
+            unCommonVarNames = Sets.difference(currBs.getBindingNames(), 
commonVarNames);
+        } else {
+            commonVarNames = Sets.newHashSet();
+            unCommonVarNames = Sets.newHashSet();
+        }
+
+        if( commonVarNames.size() == 1 && !query.commonVarConstant() && 
commonVarNames.contains(query.getCommonVarName())) {
+
+            final HashMultimap<String, BindingSet> map = HashMultimap.create();
+            final String commonVar = starQ.getCommonVarName();
+            final Iterator<Entry<Key, Value>> intersections;
+            final BatchScanner scan;
+            Set<Range> ranges = Sets.newHashSet();
+
+            while(bs.hasNext()) {
+
+                BindingSet currentBs = bs.next();
+
+                if(currentBs.getBinding(commonVar) == null) {
+                    continue;
+                }
+
+                String row = 
currentBs.getBinding(commonVar).getValue().stringValue();
+                ranges.add(new Range(row));
+                map.put(row, currentBs);
+
+            }
+            scan = runQuery(starQ, ranges);
+            intersections = scan.iterator();
+
+
+            return new CloseableIteration<BindingSet, 
QueryEvaluationException>() {
+
+
+                private QueryBindingSet currentSolutionBs = null;
+                private boolean hasNextCalled = false;
+                private boolean isEmpty = false;
+                private Iterator<BindingSet> inputSet = new 
ArrayList<BindingSet>().iterator();
+                private BindingSet currentBs;
+                private Key key;
+
+
+
+                @Override
+                public boolean hasNext() throws QueryEvaluationException {
+                    if (!hasNextCalled && !isEmpty) {
+                        while (inputSet.hasNext() || intersections.hasNext()) {
+                            if (!inputSet.hasNext()) {
+                                key = intersections.next().getKey();
+                                inputSet = 
map.get(key.getRow().toString()).iterator();
+                            }
+                            currentBs = inputSet.next();
+                            currentSolutionBs = deserializeKey(key, starQ, 
currentBs, unCommonVarNames);
+
+                            if (currentSolutionBs.size() == 
unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) {
+                                hasNextCalled = true;
+                                return true;
+                            }
+
+                        }
+
+                        isEmpty = true;
+                        return false;
+
+                    } else if (isEmpty) {
+                        return false;
+                    } else {
+                        return true;
+                    }
+
+                }
+
+
+                @Override
+                public BindingSet next() throws QueryEvaluationException {
+
+                    if (hasNextCalled) {
+                        hasNextCalled = false;
+                    } else if (isEmpty) {
+                        throw new NoSuchElementException();
+                    } else {
+                        if (this.hasNext()) {
+                            hasNextCalled = false;
+                        } else {
+                            throw new NoSuchElementException();
+                        }
+                    }
+
+                    return currentSolutionBs;
+                }
+
+                @Override
+                public void remove() throws QueryEvaluationException {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public void close() throws QueryEvaluationException {
+                    scan.close();
+                }
+
+            };
+
+
+        } else {
+
+            return new CloseableIteration<BindingSet, 
QueryEvaluationException>() {
+
+                @Override
+                public void remove() throws QueryEvaluationException {
+                    throw new UnsupportedOperationException();
+                }
+
+                private Iterator<Entry<Key, Value>> intersections = null;
+                private QueryBindingSet currentSolutionBs = null;
+                private boolean hasNextCalled = false;
+                private boolean isEmpty = false;
+                private boolean init = false;
+                private BindingSet currentBs;
+                private StarQuery sq = new StarQuery(starQ);
+                private Set<Range> emptyRangeSet = Sets.newHashSet();
+                private BatchScanner scan;
+
+                @Override
+                public BindingSet next() throws QueryEvaluationException {
+                    if (hasNextCalled) {
+                        hasNextCalled = false;
+                    } else if (isEmpty) {
+                        throw new NoSuchElementException();
+                    } else {
+                        if (this.hasNext()) {
+                            hasNextCalled = false;
+                        } else {
+                            throw new NoSuchElementException();
+                        }
+                    }
+                    return currentSolutionBs;
+                }
+
+                @Override
+                public boolean hasNext() throws QueryEvaluationException {
+
+                    if (!init) {
+                        if (intersections == null && bs.hasNext()) {
+                            currentBs = bs.next();
+                            sq = StarQuery.getConstrainedStarQuery(sq, 
currentBs);
+                            scan = runQuery(sq,emptyRangeSet);
+                            intersections = scan.iterator();
+                            // binding set empty
+                        } else if (intersections == null && !bs.hasNext()) {
+                            currentBs = new QueryBindingSet();
+                            scan = runQuery(starQ,emptyRangeSet);
+                            intersections = scan.iterator();
+                        }
+
+                        init = true;
+                    }
+
+                    if (!hasNextCalled && !isEmpty) {
+                        while (intersections.hasNext() || bs.hasNext()) {
+                            if (!intersections.hasNext()) {
+                                scan.close();
+                                currentBs = bs.next();
+                                sq = StarQuery.getConstrainedStarQuery(sq, 
currentBs);
+                                scan = runQuery(sq,emptyRangeSet);
+                                intersections = scan.iterator();
+                            }
+                            if (intersections.hasNext()) {
+                                currentSolutionBs = 
deserializeKey(intersections.next().getKey(), sq, currentBs,
+                                        unCommonVarNames);
+                            } else {
+                                continue;
+                            }
+
+                            if (sq.commonVarConstant() && 
currentSolutionBs.size() == unCommonVarNames.size() + 
sq.getUnCommonVars().size()) {
+                                hasNextCalled = true;
+                                return true;
+                            } else if(currentSolutionBs.size() == 
unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) {
+                                hasNextCalled = true;
+                                return true;
+                            }
+                        }
+
+                        isEmpty = true;
+                        return false;
+
+                    } else if (isEmpty) {
+                        return false;
+                    } else {
+                        return true;
+                    }
+                }
+
+                @Override
+                public void close() throws QueryEvaluationException {
+                    scan.close();
+                }
+            };
+        }
+    }
+
+    private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet 
currentBs, Set<String> unCommonVar) {
+
+
+        QueryBindingSet currentSolutionBs = new QueryBindingSet();
+
+        Text row = key.getRow();
+        Text cq = key.getColumnQualifier();
+
+
+        String[] cqArray = 
cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
+
+        boolean commonVarSet = false;
+
+        //if common Var is constant there is no common variable to assign a 
value to
+        if(sq.commonVarConstant()) {
+            commonVarSet = true;
+        }
+
+        if (!commonVarSet && sq.isCommonVarURI()) {
+            RyaURI rURI = new RyaURI(row.toString());
+            currentSolutionBs.addBinding(sq.getCommonVarName(),
+                    RyaToRdfConversions.convertValue(rURI));
+            commonVarSet = true;
+        }
+
+        for (String s : sq.getUnCommonVars()) {
+
+            byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes();
+            int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE);
+            int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE);
+            int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE);
+            byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 
1, secondIndex);
+            byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, 
typeIndex);
+            byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, 
cqBytes.length);
+
+            if (new String(tripleComponent).equals("object")) {
+                byte[] object = Bytes.concat(cqContent, objType);
+                org.openrdf.model.Value v = null;
+                try {
+                    v = 
RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
+                            object));
+                } catch (RyaTypeResolverException e) {
+                    e.printStackTrace();
+                }
+                currentSolutionBs.addBinding(s, v);
+
+            } else if (new String(tripleComponent).equals("subject")) {
+                if (!commonVarSet) {
+                    byte[] object = Bytes.concat(row.getBytes(), objType);
+                    org.openrdf.model.Value v = null;
+                    try {
+                        v = 
RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize(
+                                object));
+                    } catch (RyaTypeResolverException e) {
+                        e.printStackTrace();
+                    }
+                    currentSolutionBs.addBinding(sq.getCommonVarName(), v);
+                    commonVarSet = true;
+                }
+                RyaURI rURI = new RyaURI(new String(cqContent));
+                currentSolutionBs.addBinding(s, 
RyaToRdfConversions.convertValue(rURI));
+            } else {
+                throw new IllegalArgumentException("Invalid row.");
+            }
+        }
+        for (String s : unCommonVar) {
+            currentSolutionBs.addBinding(s, currentBs.getValue(s));
+        }
+        return currentSolutionBs;
+    }
+
+    private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) 
throws QueryEvaluationException {
+
+        try {
+            if (ranges.size() == 0) {
+                String rangeText = query.getCommonVarValue();
+                Range r;
+                if (rangeText != null) {
+                    r = new Range(new Text(query.getCommonVarValue()));
+                } else {
+                    r = new Range();
+                }
+                ranges = Collections.singleton(r);
+            }
+
+            Connector accCon = ConfigUtils.getConnector(conf);
+            IteratorSetting is = new IteratorSetting(30, "fii", 
DocumentIndexIntersectingIterator.class);
+
+            DocumentIndexIntersectingIterator.setColumnFamilies(is, 
query.getColumnCond());
+
+            if (query.hasContext()) {
+                DocumentIndexIntersectingIterator.setContext(is, 
query.getContextURI());
+            }
+            bs = 
accCon.createBatchScanner(EntityCentricIndex.getTableName(conf),
+                    new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 
15);
+            bs.addScanIterator(is);
+            bs.setRanges(ranges);
+
+            return bs;
+
+        } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new QueryEvaluationException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        //TODO generate an exception when BS passed in -- scanner closed
+//        if (bs != null) {
+//            bs.close();
+//        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
new file mode 100644
index 0000000..9a9daa5
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
@@ -0,0 +1,327 @@
+package mvm.rya.indexing.accumulo.entity;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.openrdf.model.URI;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Bytes;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.RyaContext;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+public class EntityCentricIndex extends AbstractAccumuloIndexer {
+
+    private static final Logger logger = 
Logger.getLogger(EntityCentricIndex.class);
+    private static final String TABLE_SUFFIX = "EntityCentricIndex";
+
+    private AccumuloRdfConfiguration conf;
+    private BatchWriter writer;
+    private boolean isInit = false;
+
+    private void initInternal() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, IOException,
+            TableExistsException {
+        ConfigUtils.createTableIfNotExists(conf, getTableName());
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+  //initialization occurs in setConf because index is created using reflection
+    @Override
+    public void setConf(final Configuration conf) {
+        if (conf instanceof AccumuloRdfConfiguration) {
+            this.conf = (AccumuloRdfConfiguration) conf;
+        } else {
+            this.conf = new AccumuloRdfConfiguration(conf);
+        }
+        if (!isInit) {
+            try {
+                initInternal();
+                isInit = true;
+            } catch (final AccumuloException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+                throw new RuntimeException(e);
+            } catch (final AccumuloSecurityException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+                throw new RuntimeException(e);
+            } catch (final TableNotFoundException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+                throw new RuntimeException(e);
+            } catch (final TableExistsException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+                throw new RuntimeException(e);
+            } catch (final IOException e) {
+                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Get the Accumulo table used by this index.  
+     * @return table used by instances of this index
+     */
+    @Override
+    public String getTableName() {
+        return getTableName(conf);
+    }
+    
+    /**
+     * Get the Accumulo table that will be used by this index.  
+     * @param conf
+     * @return table name guaranteed to be used by instances of this index
+     */
+    public static String getTableName(Configuration conf) {
+        return ConfigUtils.getTablePrefix(conf)  + TABLE_SUFFIX;
+    }
+
+    @Override
+    public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) 
throws IOException {
+        try {
+            this.writer = writer.getBatchWriter(getTableName());
+        } catch (final AccumuloException e) {
+            throw new IOException(e);
+        } catch (final AccumuloSecurityException e) {
+            throw new IOException(e);
+        } catch (final TableNotFoundException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void storeStatement(final RyaStatement stmt) throws IOException {
+        Preconditions.checkNotNull(writer, "BatchWriter not Set");
+        try {
+            for (final TripleRow row : serializeStatement(stmt)) {
+                writer.addMutation(createMutation(row));
+            }
+        } catch (final MutationsRejectedException e) {
+            throw new IOException(e);
+        } catch (final RyaTypeResolverException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void deleteStatement(final RyaStatement stmt) throws IOException {
+        Preconditions.checkNotNull(writer, "BatchWriter not Set");
+        try {
+            for (final TripleRow row : serializeStatement(stmt)) {
+                writer.addMutation(deleteMutation(row));
+            }
+        } catch (final MutationsRejectedException e) {
+            throw new IOException(e);
+        } catch (final RyaTypeResolverException e) {
+            throw new IOException(e);
+        }
+    }
+
+    protected Mutation deleteMutation(final TripleRow tripleRow) {
+        final Mutation m = new Mutation(new Text(tripleRow.getRow()));
+
+        final byte[] columnFamily = tripleRow.getColumnFamily();
+        final Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        final byte[] columnQualifier = tripleRow.getColumnQualifier();
+        final Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+
+        final byte[] columnVisibility = tripleRow.getColumnVisibility();
+        final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+
+        m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp());
+        return m;
+    }
+
+    public static Collection<Mutation> createMutations(final RyaStatement 
stmt) throws RyaTypeResolverException{
+        final Collection<Mutation> m = Lists.newArrayList();
+        for (final TripleRow tr : serializeStatement(stmt)){
+            m.add(createMutation(tr));
+        }
+        return m;
+    }
+
+    private static Mutation createMutation(final TripleRow tripleRow) {
+        final Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+        final byte[] columnVisibility = tripleRow.getColumnVisibility();
+        final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+        final Long timestamp = tripleRow.getTimestamp();
+        final byte[] value = tripleRow.getValue();
+        final Value v = value == null ? EMPTY_VALUE : new Value(value);
+        final byte[] columnQualifier = tripleRow.getColumnQualifier();
+        final Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+        final byte[] columnFamily = tripleRow.getColumnFamily();
+        final Text cfText = columnFamily == null ? EMPTY_TEXT : new 
Text(columnFamily);
+
+        mutation.put(cfText, cqText, cv, timestamp, v);
+        return mutation;
+    }
+
+    private static List<TripleRow> serializeStatement(final RyaStatement stmt) 
throws RyaTypeResolverException {
+        final RyaURI subject = stmt.getSubject();
+        final RyaURI predicate = stmt.getPredicate();
+        final RyaType object = stmt.getObject();
+        final RyaURI context = stmt.getContext();
+        final Long timestamp = stmt.getTimestamp();
+        final byte[] columnVisibility = stmt.getColumnVisibility();
+        final byte[] value = stmt.getValue();
+        assert subject != null && predicate != null && object != null;
+        final byte[] cf = (context == null) ? EMPTY_BYTES : 
context.getData().getBytes();
+        final byte[] subjBytes = subject.getData().getBytes();
+        final byte[] predBytes = predicate.getData().getBytes();
+        final byte[][] objBytes = 
RyaContext.getInstance().serializeType(object);
+
+        return Lists.newArrayList(new TripleRow(subjBytes,
+            predBytes,
+            Bytes.concat(cf, DELIM_BYTES,
+                "object".getBytes(), DELIM_BYTES,
+                objBytes[0], objBytes[1]),
+            timestamp,
+            columnVisibility,
+            value),
+            new TripleRow(objBytes[0],
+                predBytes,
+                Bytes.concat(cf, DELIM_BYTES,
+                    "subject".getBytes(), DELIM_BYTES,
+                    subjBytes, objBytes[1]),
+                timestamp,
+                columnVisibility,
+                value));
+    }
+
+    /**
+     * Deserialize a row from the entity-centric index.
+     * @param key Row key, contains statement data
+     * @param value Row value
+     * @return The statement represented by the row
+     * @throws IOException if edge direction can't be extracted as expected.
+     * @throws RyaTypeResolverException if a type error occurs deserializing 
the statement's object.
+     */
+    public static RyaStatement deserializeStatement(Key key, Value value) 
throws RyaTypeResolverException, IOException {
+        assert key != null;
+        assert value != null;
+        byte[] entityBytes = key.getRowData().toArray();
+        byte[] predicateBytes = key.getColumnFamilyData().toArray();
+        byte[] data = key.getColumnQualifierData().toArray();
+        long timestamp = key.getTimestamp();
+        byte[] columnVisibility = key.getColumnVisibilityData().toArray();
+        byte[] valueBytes = value.get();
+
+        // main entity is either the subject or object
+        // data contains: column family , var name of other node , data of 
other node + datatype of object
+        int split = Bytes.indexOf(data, DELIM_BYTES);
+        byte[] columnFamily = Arrays.copyOf(data, split);
+        byte[] edgeBytes = Arrays.copyOfRange(data, split + 
DELIM_BYTES.length, data.length);
+        split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
+        String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
+        byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + 
DELIM_BYTES.length, edgeBytes.length - 2);
+        byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  edgeBytes.length - 
2, edgeBytes.length);
+        byte[] objectBytes;
+        RyaURI subject;
+        RyaURI predicate = new RyaURI(new String(predicateBytes));
+        RyaType object;
+        RyaURI context = null;
+        // Expect either: entity=subject.data, otherNodeVar="object", 
otherNodeBytes={object.data, object.datatype_marker}
+        //            or: entity=object.data, otherNodeVar="subject", 
otherNodeBytes={subject.data, object.datatype_marker}
+        switch (otherNodeVar) {
+            case "subject":
+                subject = new RyaURI(new String(otherNodeBytes));
+                objectBytes = Bytes.concat(entityBytes, typeBytes);
+                break;
+            case "object":
+                subject = new RyaURI(new String(entityBytes));
+                objectBytes = Bytes.concat(otherNodeBytes, typeBytes);
+                break;
+            default:
+                throw new IOException("Failed to deserialize entity-centric 
index row. "
+                        + "Expected 'subject' or 'object', encountered: '" + 
otherNodeVar + "'");
+        }
+        object = RyaContext.getInstance().deserialize(objectBytes);
+        if (columnFamily != null && columnFamily.length > 0) {
+            context = new RyaURI(new String(columnFamily));
+        }
+        return new RyaStatement(subject, predicate, object, context,
+                null, columnVisibility, valueBytes, timestamp);
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void setConnector(final Connector connector) {
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public void purge(final RdfCloudTripleStoreConfiguration configuration) {
+    }
+
+    @Override
+    public void dropAndDestroy() {
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
new file mode 100644
index 0000000..2030e58
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java
@@ -0,0 +1,171 @@
+package mvm.rya.indexing.accumulo.entity;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+public class EntityLocalityGroupSetter {
+
+    
+    String tablePrefix;
+    Connector conn;
+    Configuration conf;
+    
+    public EntityLocalityGroupSetter(String tablePrefix, Connector conn, 
Configuration conf) {
+        this.conn = conn;
+        this.tablePrefix = tablePrefix;
+        this.conf = conf;
+    }
+    
+    
+    
+    private Iterator<String> getPredicates() {
+        
+        String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS);
+        BatchScanner bs = null;
+        try {
+            bs = conn.createBatchScanner(tablePrefix + "prospects", new 
Authorizations(auths), 10);
+        } catch (TableNotFoundException e) {
+            e.printStackTrace();
+        }
+        bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + 
"\u0000"))));
+        final Iterator<Entry<Key,Value>> iter = bs.iterator();
+        
+        return new Iterator<String>() {
+
+            private String next = null;
+            private boolean hasNextCalled = false;
+            private boolean isEmpty = false;
+
+            @Override
+            public boolean hasNext() {
+
+                if (!hasNextCalled && !isEmpty) {
+                    while (iter.hasNext()) {
+                        Entry<Key,Value> temp = iter.next();
+                        String row = temp.getKey().getRow().toString();
+                        String[] rowArray = row.split("\u0000");
+                        next = rowArray[1];
+                        
+                        hasNextCalled = true;
+                        return true;
+                    }
+                    isEmpty = true;
+                    return false;
+                } else if(isEmpty) {
+                    return false;
+                }else {
+                    return true;
+                }
+            }
+
+            @Override
+            public String next() {
+
+                if (hasNextCalled) {
+                    hasNextCalled = false;
+                    return next;
+                } else if(isEmpty) {
+                    throw new NoSuchElementException();
+                }else {
+                    if (this.hasNext()) {
+                        hasNextCalled = false;
+                        return next;
+                    } else {
+                        throw new NoSuchElementException();
+                    }
+                }
+            }
+
+            @Override
+            public void remove() {
+
+                throw new UnsupportedOperationException("Cannot delete from 
iterator!");
+
+            }
+
+        }; 
+    }
+    
+    
+    
+    
+    
+    
+    
+    
+    public void setLocalityGroups() {
+        
+        HashMap<String, Set<Text>> localityGroups = new HashMap<String, 
Set<Text>>();
+        Iterator<String> groups = getPredicates();
+        
+        int i = 1;
+        
+        while(groups.hasNext()) {
+            HashSet<Text> tempColumn = new HashSet<Text>();
+            String temp = groups.next();
+            tempColumn.add(new Text(temp));
+            String groupName = "predicate" + i;
+            localityGroups.put(groupName, tempColumn);
+            i++;
+        }
+        
+
+        try {
+            conn.tableOperations().setLocalityGroups(tablePrefix + 
"doc_partitioned_index", localityGroups);
+            //conn.tableOperations().compact(tablePrefix + 
"doc_partitioned_index", null, null, true, true);
+        } catch (AccumuloException e) {
+            e.printStackTrace();
+        } catch (AccumuloSecurityException e) {
+            e.printStackTrace();
+        } catch (TableNotFoundException e) {
+            e.printStackTrace();
+        }
+
+        
+        
+    }
+    
+    
+    
+    
+    
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java
new file mode 100644
index 0000000..bb792ac
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityOptimizer.java
@@ -0,0 +1,434 @@
+package mvm.rya.indexing.accumulo.entity;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.persist.joinselect.SelectivityEvalDAO;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.joinselect.AccumuloSelectivityEvalDAO;
+import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO;
+import mvm.rya.rdftriplestore.inference.DoNotExpandSP;
+import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.Dataset;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryOptimizer;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class EntityOptimizer implements QueryOptimizer, Configurable {
+
+    private SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval;
+    private RdfCloudTripleStoreConfiguration conf;
+    private boolean isEvalDaoSet = false;
+
+
+    public EntityOptimizer() {
+
+    }
+
+    public EntityOptimizer(RdfCloudTripleStoreConfiguration conf) {
+        if(conf.isUseStats() && conf.isUseSelectivity()) {
+            try {
+                eval = new AccumuloSelectivityEvalDAO(conf, 
ConfigUtils.getConnector(conf));
+                ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new 
ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(conf), conf));
+                eval.init();
+            } catch (AccumuloException e) {
+                e.printStackTrace();
+            } catch (AccumuloSecurityException e) {
+                e.printStackTrace();
+            }
+
+            isEvalDaoSet = true;
+        } else {
+            eval = null;
+            isEvalDaoSet = true;
+        }
+        this.conf = conf;
+    }
+
+    public 
EntityOptimizer(SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> eval) {
+        this.eval = eval;
+        this.conf = eval.getConf();
+        isEvalDaoSet = true;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        if(conf instanceof RdfCloudTripleStoreConfiguration) {
+            this.conf = (RdfCloudTripleStoreConfiguration) conf;
+        } else {
+            this.conf = new AccumuloRdfConfiguration(conf);
+        }
+
+        if (!isEvalDaoSet) {
+            if(this.conf.isUseStats() && this.conf.isUseSelectivity()) {
+                try {
+                    eval = new AccumuloSelectivityEvalDAO(this.conf, 
ConfigUtils.getConnector(this.conf));
+                    ((AccumuloSelectivityEvalDAO)eval).setRdfEvalDAO(new 
ProspectorServiceEvalStatsDAO(ConfigUtils.getConnector(this.conf), this.conf));
+                    eval.init();
+                } catch (AccumuloException e) {
+                    e.printStackTrace();
+                } catch (AccumuloSecurityException e) {
+                    e.printStackTrace();
+                }
+
+                isEvalDaoSet = true;
+            } else {
+                eval = null;
+                isEvalDaoSet = true;
+            }
+        }
+
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    /**
+     * Applies generally applicable optimizations: path expressions are sorted
+     * from more to less specific.
+     *
+     * @param tupleExpr
+     */
+    @Override
+    public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet 
bindings) {
+        tupleExpr.visit(new JoinVisitor());
+    }
+
+    protected class JoinVisitor extends 
QueryModelVisitorBase<RuntimeException> {
+
+        @Override
+        public void meet(Join node) {
+            try {
+                if (node.getLeftArg() instanceof FixedStatementPattern && 
node.getRightArg() instanceof DoNotExpandSP) {
+                    return;
+                }
+                List<TupleExpr> joinArgs = getJoinArgs(node, new 
ArrayList<TupleExpr>());
+                HashMultimap<String, StatementPattern> varMap = 
getVarBins(joinArgs);
+                while (!varMap.keySet().isEmpty()) {
+                    String s = getHighestPriorityKey(varMap);
+                    constructTuple(varMap, joinArgs, s);
+                }
+                List<TupleExpr> filterChain = getFilterChain(joinArgs);
+
+                for (TupleExpr te : joinArgs) {
+                    if (!(te instanceof StatementPattern) || !(te instanceof 
EntityTupleSet)) {
+                        te.visit(this);
+                    }
+                }
+                // Replace old join hierarchy
+                node.replaceWith(getNewJoin(joinArgs, filterChain));
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        private List<TupleExpr> getFilterChain(List<TupleExpr> joinArgs) {
+            List<TupleExpr> filterTopBottom = Lists.newArrayList();
+            TupleExpr filterChainTop = null;
+            TupleExpr filterChainBottom = null;
+
+            for(int i = 0; i < joinArgs.size(); i++) {
+                if(joinArgs.get(i) instanceof Filter) {
+                    if(filterChainTop == null) {
+                        filterChainTop = joinArgs.remove(i);
+                        i--;
+                    } else if(filterChainBottom == null){
+                        filterChainBottom = joinArgs.remove(i);
+                        ((Filter)filterChainTop).setArg(filterChainBottom);
+                        i--;
+                    } else {
+                        ((Filter)filterChainBottom).setArg(joinArgs.remove(i));
+                        filterChainBottom = 
((Filter)filterChainBottom).getArg();
+                        i--;
+                    }
+                }
+            }
+            if(filterChainTop != null) {
+                filterTopBottom.add(filterChainTop);
+            }
+            if(filterChainBottom != null) {
+                filterTopBottom.add(filterChainBottom);
+            }
+            return filterTopBottom;
+        }
+
+        private TupleExpr getNewJoin(List<TupleExpr> joinArgs, List<TupleExpr> 
filterChain) {
+            TupleExpr newJoin;
+
+            if (joinArgs.size() > 1) {
+                if (filterChain.size() > 0) {
+                    TupleExpr finalJoinArg = joinArgs.remove(0);
+                    TupleExpr tempJoin;
+                    TupleExpr temp = filterChain.get(0);
+
+                    if (joinArgs.size() > 1) {
+                        tempJoin = new Join(joinArgs.remove(0), 
joinArgs.remove(0));
+                        for (TupleExpr te : joinArgs) {
+                            tempJoin = new Join(tempJoin, te);
+                        }
+                    } else {
+                        tempJoin = joinArgs.remove(0);
+                    }
+
+                    if (filterChain.size() == 1) {
+                        ((Filter) temp).setArg(tempJoin);
+                    } else {
+                        ((Filter) filterChain.get(1)).setArg(tempJoin);
+                    }
+                    newJoin = new Join(temp, finalJoinArg);
+                } else {
+                    newJoin = new Join(joinArgs.get(0), joinArgs.get(1));
+                    joinArgs.remove(0);
+                    joinArgs.remove(0);
+
+                    for (TupleExpr te : joinArgs) {
+                        newJoin = new Join(newJoin, te);
+                    }
+                }
+            } else if (joinArgs.size() == 1) {
+                if (filterChain.size() > 0) {
+                    newJoin = filterChain.get(0);
+                    if (filterChain.size() == 1) {
+                        ((Filter) newJoin).setArg(joinArgs.get(0));
+                    } else {
+                        ((Filter) filterChain.get(1)).setArg(joinArgs.get(0));
+                    }
+                } else {
+                    newJoin = joinArgs.get(0);
+                }
+            } else {
+                throw new IllegalStateException("JoinArgs size cannot be 
zero.");
+            }
+            return newJoin;
+        }
+
+        private HashMultimap<String, StatementPattern> 
getVarBins(List<TupleExpr> nodes) {
+
+            HashMultimap<String, StatementPattern> varMap = 
HashMultimap.create();
+
+            for (QueryModelNode node : nodes) {
+                if (node instanceof StatementPattern) {
+                    StatementPattern sp = (StatementPattern) node;
+                    if (sp.getPredicateVar().isConstant()) {
+                        varMap.put(sp.getSubjectVar().getName(), sp);
+                        varMap.put(sp.getObjectVar().getName(), sp);
+                    }
+                }
+            }
+
+            removeInvalidBins(varMap, true);
+
+            return varMap;
+        }
+
+        private void updateVarMap(HashMultimap<String, StatementPattern> 
varMap, Set<StatementPattern> bin) {
+
+            for (StatementPattern sp : bin) {
+                varMap.remove(sp.getSubjectVar().getName(), sp);
+                varMap.remove(sp.getObjectVar().getName(), sp);
+            }
+
+            removeInvalidBins(varMap, false);
+
+        }
+
+        private void removeInvalidBins(HashMultimap<String, StatementPattern> 
varMap, boolean newMap) {
+
+            Set<String> keys = Sets.newHashSet(varMap.keySet());
+
+            if (newMap) {
+                for (String s : keys) {
+                    Set<StatementPattern> spSet = 
Sets.newHashSet(varMap.get(s));
+                    if (!StarQuery.isValidStarQuery(spSet)) {
+                        for (StatementPattern sp : spSet) {
+                            varMap.remove(s, sp);
+                        }
+                    }
+
+                }
+            } else {
+
+                for (String s : keys) {
+                    Set<StatementPattern> spSet = 
Sets.newHashSet(varMap.get(s));
+                    if (spSet.size() == 1) {
+                        for (StatementPattern sp : spSet) {
+                            varMap.remove(s, sp);
+                        }
+                    }
+
+                }
+            }
+
+        }
+
+        private void constructTuple(HashMultimap<String, StatementPattern> 
varMap, List<TupleExpr> joinArgs,
+                String binName) {
+
+            Set<StatementPattern> bin = Sets.newHashSet(varMap.get(binName));
+            StarQuery sq = new StarQuery(bin);
+
+            updateVarMap(varMap, bin);
+            for (StatementPattern sp : bin) {
+                joinArgs.remove(sp);
+            }
+
+            joinArgs.add(new EntityTupleSet(sq, conf));
+
+        }
+
+        private String getHighestPriorityKey(HashMultimap<String, 
StatementPattern> varMap) {
+
+            double tempPriority = -1;
+            double priority = -Double.MAX_VALUE;
+            String priorityKey = "";
+            Set<StatementPattern> bin = null;
+
+            Set<String> keys = varMap.keySet();
+
+            for (String s : keys) {
+                bin = varMap.get(s);
+                tempPriority = bin.size();
+                tempPriority *= getCardinality(bin);
+                tempPriority *= getMinCardSp(bin);
+
+                // weight starQuery where common Var is constant slightly more 
-- this factor is subject
+                // to change
+                if(s.startsWith("-const-")) {
+                    tempPriority *= 10;
+                }
+                if (tempPriority > priority) {
+                    priority = tempPriority;
+                    priorityKey = s;
+                }
+            }
+            return priorityKey;
+        }
+
+        private double getMinCardSp(Collection<StatementPattern> nodes) {
+
+            double cardinality = Double.MAX_VALUE;
+            double tempCard = -1;
+
+            if (eval == null) {
+                return 1;
+            }
+
+            for (StatementPattern sp : nodes) {
+
+                try {
+                    tempCard = eval.getCardinality(conf, sp);
+
+                    if (tempCard < cardinality) {
+                        cardinality = tempCard;
+
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+            }
+
+            return cardinality;
+
+        }
+
+        private double getCardinality(Collection<StatementPattern> spNodes) {
+
+            double cardinality = Double.MAX_VALUE;
+            double tempCard = -1;
+
+
+            if(eval == null) {
+                return 1;
+            }
+
+            List<StatementPattern> nodes = Lists.newArrayList(spNodes);
+
+            AccumuloSelectivityEvalDAO ase = (AccumuloSelectivityEvalDAO) eval;
+            ase.setDenormalized(true);
+
+            try {
+
+                for (int i = 0; i < nodes.size(); i++) {
+                    for (int j = i + 1; j < nodes.size(); j++) {
+                        tempCard = ase.getJoinSelect(conf, nodes.get(i), 
nodes.get(j));
+                        if (tempCard < cardinality) {
+                            cardinality = tempCard;
+                        }
+                    }
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            ase.setDenormalized(false);
+
+            return cardinality / (nodes.size() + 1);
+
+        }
+
+        protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr 
tupleExpr, L joinArgs) {
+            if (tupleExpr instanceof Join) {
+                if (!(((Join) tupleExpr).getLeftArg() instanceof 
FixedStatementPattern)
+                        && !(((Join) tupleExpr).getRightArg() instanceof 
DoNotExpandSP)) {
+                    Join join = (Join) tupleExpr;
+                    getJoinArgs(join.getLeftArg(), joinArgs);
+                    getJoinArgs(join.getRightArg(), joinArgs);
+                }
+            } else if(tupleExpr instanceof Filter) {
+                joinArgs.add(tupleExpr);
+                getJoinArgs(((Filter)tupleExpr).getArg(), joinArgs);
+            } else {
+                joinArgs.add(tupleExpr);
+            }
+
+            return joinArgs;
+        }
+
+    }
+
+
+
+}

Reply via email to