http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java
 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java
new file mode 100644
index 0000000..59e7611
--- /dev/null
+++ 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java
@@ -0,0 +1,182 @@
+package mvm.rya.joinselect.mr.utils;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV;
+import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
+import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.INSTANCE;
+import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PASSWORD;
+import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.USERNAME;
+import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.ZOOKEEPERS;
+
+import java.io.IOException;
+
+import mvm.rya.api.resolver.triple.TripleRow;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+public class JoinSelectStatsUtil {
+
+  public static void initSumMRJob(Job job, String inputPath, String outtable, 
String auths) throws AccumuloSecurityException, IOException {
+    Configuration conf = job.getConfiguration();
+    String username = conf.get(USERNAME);
+    String password = conf.get(PASSWORD);
+    String instance = conf.get(INSTANCE);
+    String zookeepers = conf.get(ZOOKEEPERS);
+
+    if (zookeepers != null) {
+      AccumuloOutputFormat.setConnectorInfo(job, username, new 
PasswordToken(password));
+      AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
+    } else {
+      throw new IllegalArgumentException("Must specify zookeepers");
+    }
+
+    SequenceFileInputFormat.addInputPath(job, new Path(inputPath));
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(TripleEntry.class);
+    job.setMapOutputValueClass(CardList.class);
+
+    AccumuloOutputFormat.setDefaultTableName(job, outtable);
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Mutation.class);
+
+  }
+
+  public static void initTableMRJob(Job job, String intable, String outtable, 
String auths) throws AccumuloSecurityException {
+    Configuration conf = job.getConfiguration();
+    String username = conf.get(USERNAME);
+    String password = conf.get(PASSWORD);
+    String instance = conf.get(INSTANCE);
+    String zookeepers = conf.get(ZOOKEEPERS);
+
+    System.out.println("Zookeepers are " + auths);
+
+    if (zookeepers != null) {
+      AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
+      AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers);
+    } else {
+      throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+    }
+
+    AccumuloInputFormat.setConnectorInfo(job, username, new 
PasswordToken(password));
+    AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths));
+    AccumuloInputFormat.setInputTableName(job, intable);
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+
+    // OUTPUT
+    AccumuloOutputFormat.setConnectorInfo(job, username, new 
PasswordToken(password));
+    AccumuloOutputFormat.setDefaultTableName(job, outtable);
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Mutation.class);
+
+  }
+
+  public static void initTabToSeqFileJob(Job job, String intable, String 
outpath, String auths) throws AccumuloSecurityException {
+
+    Configuration conf = job.getConfiguration();
+    String username = conf.get(USERNAME);
+    String password = conf.get(PASSWORD);
+    String instance = conf.get(INSTANCE);
+    String zookeepers = conf.get(ZOOKEEPERS);
+
+    System.out.println("Zookeepers are " + auths);
+
+    if (zookeepers != null) {
+      AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers);
+    } else {
+      throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+    }
+
+    AccumuloInputFormat.setConnectorInfo(job, username, new 
PasswordToken(password));
+    AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths));
+    AccumuloInputFormat.setInputTableName(job, intable);
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    job.setMapOutputKeyClass(CompositeType.class);
+    job.setMapOutputValueClass(TripleCard.class);
+
+    // OUTPUT
+    SequenceFileOutputFormat.setOutputPath(job, new Path(outpath));
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(CompositeType.class);
+    job.setOutputValueClass(TripleCard.class);
+
+  }
+
+  public static void initJoinMRJob(Job job, String prospectsPath, String 
spoPath, Class<? extends Mapper<CompositeType,TripleCard,?,?>> mapperClass,
+      String outPath, String auths) throws AccumuloSecurityException {
+
+    MultipleInputs.addInputPath(job, new Path(prospectsPath), 
SequenceFileInputFormat.class, mapperClass);
+    MultipleInputs.addInputPath(job, new Path(spoPath), 
SequenceFileInputFormat.class, mapperClass);
+    job.setMapOutputKeyClass(CompositeType.class);
+    job.setMapOutputValueClass(TripleCard.class);
+
+    SequenceFileOutputFormat.setOutputPath(job, new Path(outPath));
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(TripleEntry.class);
+    job.setOutputValueClass(CardList.class);
+
+  }
+
+  public static Mutation createMutation(TripleRow tripleRow) {
+    Mutation mutation = new Mutation(new Text(tripleRow.getRow()));
+    byte[] columnVisibility = tripleRow.getColumnVisibility();
+    ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new 
ColumnVisibility(columnVisibility);
+    Long timestamp = tripleRow.getTimestamp();
+    boolean hasts = timestamp != null;
+    timestamp = timestamp == null ? 0l : timestamp;
+    byte[] value = tripleRow.getValue();
+    Value v = value == null ? EMPTY_VALUE : new Value(value);
+    byte[] columnQualifier = tripleRow.getColumnQualifier();
+    Text cqText = columnQualifier == null ? EMPTY_TEXT : new 
Text(columnQualifier);
+    byte[] columnFamily = tripleRow.getColumnFamily();
+    Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily);
+
+    if (hasts) {
+      mutation.put(cfText, cqText, cv, timestamp, v);
+    } else {
+      mutation.put(cfText, cqText, cv, v);
+
+    }
+    return mutation;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java
 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java
new file mode 100644
index 0000000..3843600
--- /dev/null
+++ 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java
@@ -0,0 +1,144 @@
+package mvm.rya.joinselect.mr.utils;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class TripleCard implements WritableComparable<TripleCard> {
+
+  private CardinalityType card = null;
+  private TripleEntry te = null;
+
+  private CardinalityType tempCard = new CardinalityType();
+  private TripleEntry tempTe = new TripleEntry();
+
+  public TripleCard() {}
+
+  public TripleCard(CardinalityType card) {
+    this.setCard(card);
+  }
+
+  public TripleCard(TripleEntry te) {
+    this.setTE(te);
+  }
+
+  public void setCard(CardinalityType card) {
+    tempCard.set(card);
+    this.card = tempCard;
+    this.te = null;
+  }
+
+  public void setTE(TripleEntry te) {
+    tempTe.setTE(te);
+    this.te = tempTe;
+    this.card = null;
+  }
+
+  public CardinalityType getCard() {
+    return this.card;
+  }
+
+  public TripleEntry getTE() {
+    return this.te;
+  }
+
+  public boolean isCardNull() {
+    return (card == null);
+  }
+
+  public boolean isTeNull() {
+    return (te == null);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (card != null) {
+      out.writeBoolean(true);
+      card.write(out);
+    } else {
+      out.writeBoolean(false);
+      te.write(out);
+    }
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      tempCard.readFields(in);
+      card = tempCard;
+      te = null;
+    } else {
+      tempTe.readFields(in);
+      te = tempTe;
+      card = null;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 7;
+    if (card != null) {
+      result = result * 17 + card.hashCode();
+    } else {
+      result = result * 17 + te.hashCode();
+    }
+    return result;
+
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof TripleCard) {
+      TripleCard comp = (TripleCard) o;
+      if (card != null) {
+        return card.equals(comp.card);
+      } else {
+        return te.equals(comp.te);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    if (card != null) {
+      return card.toString();
+    } else {
+      return te.toString();
+    }
+  }
+
+  @Override
+  public int compareTo(TripleCard o) {
+
+    if (card != null) {
+      return card.compareTo(o.card);
+    } else {
+      return te.compareTo(o.te);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java
 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java
new file mode 100644
index 0000000..8f3769c
--- /dev/null
+++ 
b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java
@@ -0,0 +1,179 @@
+package mvm.rya.joinselect.mr.utils;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+public class TripleEntry implements WritableComparable<TripleEntry> {
+
+  private Text first;
+  private Text second;
+  private Text firstPos;
+  private Text secondPos;
+  private Text keyPos;
+
+  public TripleEntry() {
+
+    first = new Text();
+    second = new Text();
+    firstPos = new Text();
+    secondPos = new Text();
+    keyPos = new Text();
+
+  }
+
+  public TripleEntry(String first, String second, String firstPos, String 
secondPos, String keyPos) {
+    this.first = new Text(first);
+    this.second = new Text(second);
+    this.firstPos = new Text(firstPos);
+    this.secondPos = new Text(secondPos);
+    this.keyPos = new Text(keyPos);
+  }
+
+  public TripleEntry(Text first, Text second, Text firstPos, Text secondPos, 
Text keyPos) {
+    this.first = first;
+    this.second = second;
+    this.firstPos = firstPos;
+    this.secondPos = secondPos;
+    this.keyPos = keyPos;
+  }
+
+  public void setEntry(Text first, Text second) {
+    this.first = first;
+    this.second = second;
+  }
+
+  public void setPosition(Text firstPos, Text secondPos, Text keyPos) {
+    this.firstPos = firstPos;
+    this.secondPos = secondPos;
+    this.keyPos = keyPos;
+  }
+
+  public void setTE(TripleEntry te) {
+
+    this.first.set(te.first);
+    this.second.set(te.second);
+    this.firstPos.set(te.firstPos);
+    this.secondPos.set(te.secondPos);
+    this.keyPos.set(te.keyPos);
+
+  }
+
+  public Text getFirst() {
+    return this.first;
+  }
+
+  public Text getSecond() {
+    return this.second;
+  }
+
+  public Text getFirstPos() {
+    return this.firstPos;
+  }
+
+  public Text getSecondPos() {
+    return this.secondPos;
+  }
+
+  public Text getKeyPos() {
+    return this.keyPos;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    first.write(out);
+    second.write(out);
+    firstPos.write(out);
+    secondPos.write(out);
+    keyPos.write(out);
+
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first.readFields(in);
+    second.readFields(in);
+    firstPos.readFields(in);
+    secondPos.readFields(in);
+    keyPos.readFields(in);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 7;
+    result = result * 17 + first.hashCode();
+    result = result * 17 + second.hashCode();
+    result = result * 17 + firstPos.hashCode();
+    result = result * 17 + secondPos.hashCode();
+    result = result * 17 + keyPos.hashCode();
+
+    return result;
+
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof TripleEntry) {
+      TripleEntry trip = (TripleEntry) o;
+      return first.equals(trip.first) && second.equals(trip.second) && 
firstPos.equals(trip.firstPos) && secondPos.equals(trip.secondPos)
+          && keyPos.equals(trip.keyPos);
+
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return first + "\t" + firstPos + "\t" + second + "\t" + secondPos + "\t" + 
keyPos;
+
+  }
+
+  @Override
+  public int compareTo(TripleEntry o) {
+
+    int cmp = first.compareTo(o.first);
+    if (cmp != 0) {
+      return cmp;
+    }
+    cmp = firstPos.compareTo(o.firstPos);
+    if (cmp != 0) {
+      return cmp;
+    }
+    cmp = second.compareTo(o.second);
+    if (cmp != 0) {
+      return cmp;
+    }
+
+    cmp = secondPos.compareTo(o.secondPos);
+    if (cmp != 0) {
+      return cmp;
+    }
+    return keyPos.compareTo(o.keyPos);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan
 
b/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan
new file mode 100644
index 0000000..38258c1
--- /dev/null
+++ 
b/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan
@@ -0,0 +1 @@
+mvm.rya.prospector.plans.impl.CountPlan
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy
 
b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy
new file mode 100644
index 0000000..f3ef96e
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy
@@ -0,0 +1,178 @@
+package mvm.rya.prospector.mr
+
+import com.google.common.collect.Iterators
+import com.google.common.collect.Lists
+import mvm.rya.accumulo.AccumuloRyaDAO
+import mvm.rya.accumulo.AccumuloRdfConfiguration
+import mvm.rya.api.persist.RdfEvalStatsDAO
+import mvm.rya.api.domain.RyaStatement
+import mvm.rya.api.domain.RyaType
+import mvm.rya.api.domain.RyaURI
+import mvm.rya.prospector.domain.IndexEntry
+import mvm.rya.prospector.domain.TripleValueType
+import mvm.rya.prospector.service.ProspectorService
+import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO
+import mvm.rya.prospector.utils.ProspectorConstants
+import org.apache.accumulo.core.client.Instance
+import org.apache.accumulo.core.client.mock.MockInstance
+import org.apache.accumulo.core.security.Authorizations
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.ToolRunner
+import org.junit.Test
+import org.openrdf.model.vocabulary.XMLSchema
+import org.openrdf.model.impl.URIImpl
+
+import static org.junit.Assert.assertEquals
+import org.openrdf.model.impl.LiteralImpl
+import org.openrdf.model.Value
+
+/**
+ * Date: 12/4/12
+ * Time: 4:33 PM
+ */
+class ProspectorTest {
+
+    @Test
+    public void testCount() throws Exception {
+
+        Instance mock = new MockInstance("accumulo");
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+        def connector = mock.getConnector("user", "pass".bytes)
+        def intable = "rya_spo"
+        def outtable = "rya_prospects"
+        if (connector.tableOperations().exists(outtable))
+            connector.tableOperations().delete(outtable)
+               connector.tableOperations().create(outtable)
+               
+        AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
+        ryaDAO.setConnector(connector);
+        ryaDAO.init()
+
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata1")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata2")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred1"), new RyaType("12")))
+
+        def confFile = "stats_cluster_config.xml"
+        def confPath = new 
Path(getClass().getClassLoader().getResource(confFile).toString())
+        def args = (String[]) [confPath];
+        ToolRunner.run(new Prospector(), args);
+        debugTable(connector, outtable)
+        
+        def scanner = connector.createScanner(outtable, new 
Authorizations("U", "FOUO"))
+        def iter = scanner.iterator()
+//        assertEquals(11, Iterators.size(iter))
+
+        ryaDAO.destroy()
+               
+        def conf = new Configuration()
+        conf.addResource(confPath)
+      //  debugTable(mrInfo, outtable)
+
+        def service = new ProspectorService(connector, outtable)
+        def auths = (String[]) ["U", "FOUO"]
+        def prospects = service.getProspects(auths)
+        def plist = Lists.newArrayList(prospects)
+        assertEquals(1, plist.size())
+               
+               def rdfConf = new AccumuloRdfConfiguration(conf)
+               rdfConf.setAuths("U","FOUO")
+
+        prospects = service.getProspectsInRange(System.currentTimeMillis() - 
100000, System.currentTimeMillis() + 10000, auths)
+        plist = Lists.newArrayList(prospects)
+        assertEquals(1, plist.size())
+               
+               List<String> queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem:etype");
+        def query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.entity.name(), queryTerms, XMLSchema.ANYURI.stringValue(), 
auths)
+        assertEquals(1, query.size())
+//        assertEquals(
+//                new IndexEntry(index: ProspectorConstants.COUNT, data: 
"urn:gem:etype", dataType: XMLSchema.ANYURI.stringValue(),
+//                        tripleValueType: TripleValueType.entity, visibility: 
"", count: -1, timestamp: plist.get(0)),
+//                query.get(0))
+               
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem:etype#1234");
+        query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.subject.name(), queryTerms, XMLSchema.ANYURI.stringValue(), 
auths)
+        assertEquals(1, query.size())
+               
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem#pred");
+        query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.predicate.name(), queryTerms, XMLSchema.ANYURI.stringValue(), 
auths)
+        assertEquals(1, query.size())
+        assertEquals(
+                new IndexEntry(index: ProspectorConstants.COUNT, data: 
"urn:gem#pred", dataType: XMLSchema.ANYURI.stringValue(),
+                        tripleValueType: TripleValueType.predicate, 
visibility: "", count: 4l, timestamp: plist.get(0)),
+                query.get(0))
+               
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("mydata1");
+        query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.object.name(), queryTerms, XMLSchema.STRING.stringValue(), 
auths)
+        assertEquals(1, query.size())
+//        assertEquals(
+//                new IndexEntry(index: ProspectorConstants.COUNT, data: 
"mydata1", dataType: XMLSchema.STRING.stringValue(),
+//                        tripleValueType: TripleValueType.object, visibility: 
"", count: -1, timestamp: plist.get(0)),
+//                query.get(0))
+               
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem:etype#1234");
+               queryTerms.add("urn:gem#pred");
+               query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.subjectpredicate.name(), queryTerms, 
XMLSchema.STRING.stringValue(), auths)
+               assertEquals(1, query.size())
+//             assertEquals(
+//                             new IndexEntry(index: 
ProspectorConstants.COUNT, data: "urn:gem:etype#1234" + "\u0000" + 
"urn:gem#pred", dataType: XMLSchema.STRING.stringValue(),
+//                                             tripleValueType: 
TripleValueType.subjectpredicate, visibility: "", count: -1, timestamp: 
plist.get(0)),
+//                             query.get(0))
+
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem#pred");
+               queryTerms.add("12");
+               query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.predicateobject.name(), queryTerms, 
XMLSchema.STRING.stringValue(), auths)
+               assertEquals(1, query.size())
+//             assertEquals(
+//                             new IndexEntry(index: 
ProspectorConstants.COUNT, data: "urn:gem#pred" + "\u0000" + "12", dataType: 
XMLSchema.STRING.stringValue(),
+//                                             tripleValueType: 
TripleValueType.predicateobject, visibility: "", count: -1, timestamp: 
plist.get(0)),
+//                             query.get(0))
+
+               queryTerms = new ArrayList<String>();
+               queryTerms.add("urn:gem:etype#1234");
+               queryTerms.add("mydata1");
+               query = service.query(plist, ProspectorConstants.COUNT, 
TripleValueType.subjectobject.name(), queryTerms, 
XMLSchema.STRING.stringValue(), auths)
+               
+               assertEquals(1, query.size())
+//             assertEquals(
+//                             new IndexEntry(index: 
ProspectorConstants.COUNT, data: "urn:gem:etype#1234" + "\u0000" + "mydata1", 
dataType: XMLSchema.STRING.stringValue(),
+//                                             tripleValueType: 
TripleValueType.subjectobject, visibility: "", count: -1, timestamp: 
plist.get(0)),
+//                             query.get(0))
+
+        //should be in a teardown method
+        connector.tableOperations().delete(outtable)
+    }
+
+    private void debugTable(def connector, String table) {
+        connector.createScanner(table, new Authorizations((String[]) ["U", 
"FOUO"])).iterator().each {
+            println it
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy
 
b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy
new file mode 100644
index 0000000..275c6d5
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy
@@ -0,0 +1,182 @@
+package mvm.rya.prospector.service
+
+import com.google.common.collect.Iterators
+import mvm.rya.accumulo.AccumuloRdfConfiguration
+import mvm.rya.accumulo.AccumuloRyaDAO
+import mvm.rya.api.domain.RyaStatement
+import mvm.rya.api.domain.RyaType
+import mvm.rya.api.domain.RyaURI
+import mvm.rya.api.persist.RdfEvalStatsDAO
+import mvm.rya.prospector.mr.Prospector
+import org.apache.accumulo.core.client.Instance
+import org.apache.accumulo.core.client.mock.MockInstance
+import org.apache.accumulo.core.security.Authorizations
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.ToolRunner
+import org.junit.Test
+import org.openrdf.model.impl.URIImpl
+import org.openrdf.model.vocabulary.XMLSchema
+
+import static org.junit.Assert.assertEquals
+import org.openrdf.model.impl.LiteralImpl
+import org.openrdf.model.Value
+
+/**
+ * Date: 1/26/13
+ * Time: 3:00 PM
+ */
+class ProspectorServiceEvalStatsDAOTest {
+
+    @Test
+    public void testCount() throws Exception {
+
+        Instance mock = new MockInstance("accumulo");
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+        def connector = mock.getConnector("user", "pass".bytes)
+        def intable = "rya_spo"
+        def outtable = "rya_prospects"
+        if (connector.tableOperations().exists(outtable))
+            connector.tableOperations().delete(outtable)
+        connector.tableOperations().create(outtable)
+        
+        AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
+        ryaDAO.setConnector(connector);
+        ryaDAO.init()
+
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata1")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata2")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred1"), new RyaType("12")))
+
+               def confFile = "stats_cluster_config.xml"
+        def confPath = new 
Path(getClass().getClassLoader().getResource(confFile).toString())
+        def args = (String[]) [confPath]; 
+        ToolRunner.run(new Prospector(), args);
+               debugTable(connector, outtable)
+               
+        def scanner = connector.createScanner(outtable, new 
Authorizations("U", "FOUO"))
+        def iter = scanner.iterator()
+//        assertEquals(11, Iterators.size(iter))
+
+        ryaDAO.destroy()
+
+        def conf = new Configuration()
+        conf.addResource(confPath)
+//        debugTable(connector, outtable)
+
+        def rdfConf = new AccumuloRdfConfiguration(conf)
+        rdfConf.setAuths("U","FOUO")
+        def evalDao = new ProspectorServiceEvalStatsDAO(connector, rdfConf)
+        evalDao.init()
+               
+               List<Value> values = new ArrayList<Value>();
+               values.add( new URIImpl("urn:gem#pred"));
+
+        def count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE, values)
+        assertEquals(4.0, count, 0.001);
+
+               values = new ArrayList<Value>();
+               values.add( new LiteralImpl("mydata1"));
+
+        count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values);
+        assertEquals(1.0, count, 0.001);
+
+    values = new ArrayList<Value>();
+    values.add( new LiteralImpl("mydata3"));
+
+        count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values);
+        assertEquals(-1.0, count, 0.001);
+
+                //should be in a teardown method
+        connector.tableOperations().delete(outtable)
+    }
+
+    @Test
+    public void testNoAuthsCount() throws Exception {
+
+        Instance mock = new MockInstance("accumulo");
+        def connector = mock.getConnector("user", "pass".bytes)
+        def intable = "rya_spo"
+        def outtable = "rya_prospects"
+        if (connector.tableOperations().exists(outtable))
+            connector.tableOperations().delete(outtable)
+        connector.tableOperations().create(outtable)
+        connector.securityOperations().createUser("user", "pass".bytes, new 
Authorizations("U", "FOUO"))
+
+        AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO();
+        ryaDAO.setConnector(connector);
+        ryaDAO.init()
+
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata1")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata2")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12")))
+        ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new 
RyaURI("urn:gem#pred1"), new RyaType("12")))
+
+               def confFile = "stats_cluster_config.xml"
+        def confPath = new 
Path(getClass().getClassLoader().getResource(confFile).toString())
+        def args = (String[]) [confPath]; 
+        ToolRunner.run(new Prospector(), args);
+
+        def scanner = connector.createScanner(outtable, new 
Authorizations("U", "FOUO"))
+        def iter = scanner.iterator()
+//        assertEquals(11, Iterators.size(iter))
+
+        ryaDAO.destroy()
+
+        def conf = new Configuration()
+        conf.addResource(confPath)
+
+        def rdfConf = new AccumuloRdfConfiguration(conf)
+//        rdfConf.setAuths("U","FOUO")
+        def evalDao = new ProspectorServiceEvalStatsDAO(connector, rdfConf)
+        evalDao.init()
+
+               
+               List<Value> values = new ArrayList<Value>();
+               values.add( new URIImpl("urn:gem#pred"));
+        def count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE, values)
+        assertEquals(4.0, count, 0.001);
+
+               values = new ArrayList<Value>();
+               values.add( new LiteralImpl("mydata1"));
+        count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values);
+        assertEquals(1.0, count, 0.001);
+
+    values = new ArrayList<Value>();
+    values.add( new LiteralImpl("mydata3"));
+
+        count = evalDao.getCardinality(rdfConf, 
RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values);
+        assertEquals(-1.0, count, 0.001);
+
+                //should be in a teardown method
+        connector.tableOperations().delete(outtable)
+    }
+
+    private void debugTable(def connector, String table) {
+        connector.createScanner(table, new Authorizations((String[]) ["U", 
"FOUO"])).iterator().each {
+            println it
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java
new file mode 100644
index 0000000..007af96
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java
@@ -0,0 +1,591 @@
+package mvm.rya.joinselect;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.layout.TablePrefixLayoutStrategy;
+import mvm.rya.api.persist.RdfEvalStatsDAO;
+import mvm.rya.joinselect.AccumuloSelectivityEvalDAO;
+import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.helpers.StatementPatternCollector;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.Lists;
+
+public class AccumuloSelectivityEvalDAOTest {
+
+    private static final String DELIM = "\u0000";
+    private final byte[] EMPTY_BYTE = new byte[0];
+    private final Value EMPTY_VAL = new Value(EMPTY_BYTE);
+
+    private String q1 = ""//
+            + "SELECT ?h  " //
+            + "{" //
+            + "  ?h <uri:howlsAt>  <uri:moon>. "//
+            + "  ?h <http://www.w3.org/2000/01/rdf-schema#label> <uri:dog> ."//
+            + "  ?h <uri:barksAt> <uri:cat> ."//
+            + "  ?h <uri:peesOn> <uri:hydrant> . "//
+            + "}";//
+
+    private String q2 = ""//
+            + "SELECT ?h  " //
+            + "{" //
+            + "   <uri:howlsAt> ?h  <uri:moon>. "//
+            + "   <http://www.w3.org/2000/01/rdf-schema#label> ?h <uri:dog> 
."//
+            + "   <uri:barksAt> ?h <uri:cat> ."//
+            + "   <uri:peesOn> ?h <uri:hydrant> . "//
+            + "}";//
+
+    private String q3 = ""//
+            + "SELECT ?h  " //
+            + "{" //
+            + "   <uri:howlsAt> <uri:moon>  ?h. "//
+            + "   <http://www.w3.org/2000/01/rdf-schema#label> <uri:dog> ?h  
."//
+            + "   <uri:barksAt> ?h <uri:cat> ."//
+            + "   ?h <uri:peesOn> <uri:hydrant> . "//
+            + "}";//
+
+    private Connector conn;
+    AccumuloRdfConfiguration arc;
+    RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> res;
+    BatchWriterConfig config;
+    Instance mock;
+
+    @Before
+    public void init() throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException, TableExistsException {
+
+        mock = new MockInstance("accumulo");
+        PasswordToken pToken = new PasswordToken("pass".getBytes());
+        conn = mock.getConnector("user", pToken);
+
+        config = new BatchWriterConfig();
+        config.setMaxMemory(1000);
+        config.setMaxLatency(1000, TimeUnit.SECONDS);
+        config.setMaxWriteThreads(10);
+
+        if (conn.tableOperations().exists("rya_prospects")) {
+            conn.tableOperations().delete("rya_prospects");
+        }
+        if (conn.tableOperations().exists("rya_selectivity")) {
+            conn.tableOperations().delete("rya_selectivity");
+        }
+
+        arc = new AccumuloRdfConfiguration();
+        res = new ProspectorServiceEvalStatsDAO(conn, arc);
+        arc.setTableLayoutStrategy(new TablePrefixLayoutStrategy());
+        arc.setMaxRangesForScanner(300);
+
+    }
+
+    @Test
+    public void testInitialize() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, TableExistsException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setConnector(conn);
+        accc.setRdfEvalDAO(res);
+        accc.init();
+
+        TableOperations tos = conn.tableOperations();
+        Assert.assertTrue(tos.exists("rya_prospects") && 
tos.exists("rya_selectivity"));
+        Assert.assertTrue(accc.isInitialized());
+        Assert.assertTrue(accc.getConf().equals(arc));
+        Assert.assertTrue(accc.getConnector().equals(conn));
+        Assert.assertTrue(accc.getRdfEvalDAO().equals(res));
+
+    }
+
+    @Test
+    public void testCardinalityQuery1() throws AccumuloException, 
AccumuloSecurityException, TableExistsException, TableNotFoundException,
+            MalformedQueryException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setRdfEvalDAO(res);
+        accc.setConnector(conn);
+        accc.init();
+
+        BatchWriter bw = conn.createBatchWriter("rya_prospects", config);
+        
+        BatchWriter bw1 = conn.createBatchWriter("rya_selectivity", config);
+        Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL);
+        List<Mutation> list = Lists.newArrayList();
+        list.add(m);
+        bw1.addMutations(list);
+        bw1.close();
+
+        String s1 = "predicateobject" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label"; + DELIM + "uri:dog";
+        String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + 
"uri:cat";
+        String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + 
"uri:hydrant";
+        List<Mutation> mList = new ArrayList<Mutation>();
+        Mutation m1, m2, m3;
+
+        Integer tempInt;
+        Integer tempInt2;
+
+        for (int i = 1; i < 7; i++) {
+            tempInt = 5 * i;
+            tempInt2 = 10 - i;
+            m1 = new Mutation(s1 + DELIM + i);
+            m1.put(new Text("count"), new Text(""), new 
Value((tempInt.toString()).getBytes()));
+            m2 = new Mutation(s2 + DELIM + (7 - i));
+            m2.put(new Text("count"), new Text(""), new 
Value((tempInt.toString()).getBytes()));
+            m3 = new Mutation(s3 + DELIM + (10 + i));
+            m3.put(new Text("count"), new Text(""), new 
Value((tempInt2.toString()).getBytes()));
+            mList.add(m1);
+            mList.add(m2);
+            mList.add(m3);
+        }
+
+        bw.addMutations(mList);
+        bw.close();
+
+        List<StatementPattern> spList = getSpList(q1);
+        long c1 = accc.getCardinality(arc, spList.get(0));
+        long c2 = accc.getCardinality(arc, spList.get(1));
+        long c3 = accc.getCardinality(arc, spList.get(2));
+        long c4 = accc.getCardinality(arc, spList.get(3));
+
+        Assert.assertTrue(c1 == (long) 0);
+        Assert.assertTrue(c2 == (long) 5);
+        Assert.assertTrue(c3 == (long) 30);
+        Assert.assertTrue(c4 == (long) 9);
+
+    }
+
+    @Test
+    public void testCardinalityQuery2() throws AccumuloException, 
AccumuloSecurityException, TableExistsException, TableNotFoundException,
+            MalformedQueryException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setConnector(conn);
+        accc.setRdfEvalDAO(res);
+        accc.init();
+
+        BatchWriter bw = conn.createBatchWriter("rya_prospects", config);
+
+        BatchWriter bw1 = conn.createBatchWriter("rya_selectivity", config);
+        Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL);
+        List<Mutation> list = Lists.newArrayList();
+        list.add(m);
+        bw1.addMutations(list);
+        bw1.close();
+
+        String s1 = "subjectobject" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label"; + DELIM + "uri:dog";
+        String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + 
"uri:cat";
+        String s3 = "subjectobject" + DELIM + "uri:peesOn" + DELIM + 
"uri:hydrant";
+        List<Mutation> mList = new ArrayList<Mutation>();
+        Mutation m1, m2, m3;
+
+        Integer tempInt;
+        Integer tempInt2;
+
+        for (int i = 1; i < 7; i++) {
+            tempInt = 5 * i;
+            tempInt2 = 10 - i;
+            m1 = new Mutation(s1 + DELIM + i);
+            m1.put(new Text("count"), new Text(""), new 
Value((tempInt.toString()).getBytes()));
+            m2 = new Mutation(s2 + DELIM + (7 - i));
+            m2.put(new Text("count"), new Text(""), new 
Value((tempInt.toString()).getBytes()));
+            m3 = new Mutation(s3 + DELIM + (10 + i));
+            m3.put(new Text("count"), new Text(""), new 
Value((tempInt2.toString()).getBytes()));
+            mList.add(m1);
+            mList.add(m2);
+            mList.add(m3);
+        }
+        bw.addMutations(mList);
+        bw.close();
+
+        List<StatementPattern> spList = getSpList(q2);
+        long c1 = accc.getCardinality(arc, spList.get(0));
+        long c2 = accc.getCardinality(arc, spList.get(1));
+        long c3 = accc.getCardinality(arc, spList.get(2));
+        long c4 = accc.getCardinality(arc, spList.get(3));
+
+        Assert.assertTrue(c1 == (long) 0);
+        Assert.assertTrue(c2 == (long) 5);
+        Assert.assertTrue(c3 == (long) 30);
+        Assert.assertTrue(c4 == (long) 9);
+
+    }
+
+    @Test
+    public void testJoinCardinalityQuery1() throws AccumuloException, 
AccumuloSecurityException, TableExistsException,
+            TableNotFoundException, MalformedQueryException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setConnector(conn);
+        accc.setRdfEvalDAO(res);
+        accc.init();
+
+        BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config);
+        BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config);
+
+        String s1 = "predicateobject" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label"; + DELIM + "uri:dog";
+        String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + 
"uri:cat";
+        String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + 
"uri:hydrant";
+        List<Mutation> mList = new ArrayList<Mutation>();
+        List<Mutation> mList2 = new ArrayList<Mutation>();
+        List<String> sList = Arrays.asList("subjectobject", 
"subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate",
+                "predicatesubject");
+        Mutation m1, m2, m3, m4;
+
+        m1 = new Mutation(s1 + DELIM + "1");
+        m1.put(new Text("count"), new Text(""), new Value("20".getBytes()));
+        m2 = new Mutation(s2 + DELIM + "2");
+        m2.put(new Text("count"), new Text(""), new Value("15".getBytes()));
+        m3 = new Mutation(s3 + DELIM + "3");
+        m3.put(new Text("count"), new Text(""), new Value("10".getBytes()));
+        mList.add(m1);
+        mList.add(m2);
+        mList.add(m3);
+
+        bw1.addMutations(mList);
+        bw1.close();
+
+        m1 = new Mutation(s1);
+        m2 = new Mutation(s2);
+        m3 = new Mutation(s3);
+        int i = 30;
+        int j = 60;
+        int k = 90;
+        Long count1;
+        Long count2;
+        Long count3;
+
+        for (String s : sList) {
+            count1 = (long) i;
+            count2 = (long) j;
+            count3 = (long) k;
+            m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL);
+            m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL);
+            m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL);
+            i = 2 * i;
+            j = 2 * j;
+            k = 2 * k;
+        }
+        m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m4.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL);
+        mList2.add(m1);
+        mList2.add(m2);
+        mList2.add(m3);
+        mList2.add(m4);
+        bw2.addMutations(mList2);
+        bw2.close();
+
+        Scanner scan = conn.createScanner("rya_selectivity", new 
Authorizations());
+        scan.setRange(new Range());
+
+        for (Map.Entry<Key, Value> entry : scan) {
+            System.out.println("Key row string is " + 
entry.getKey().getRow().toString());
+            System.out.println("Key is " + entry.getKey());
+            System.out.println("Value is " + (new 
String(entry.getKey().getColumnQualifier().toString())));
+
+        }
+
+        List<StatementPattern> spList = getSpList(q1);
+        System.out.println(spList);
+        List<Double> jCardList = new ArrayList<Double>();
+
+        for (StatementPattern sp1 : spList) {
+            for (StatementPattern sp2 : spList) {
+                jCardList.add(accc.getJoinSelect(arc, sp1, sp2));
+            }
+        }
+
+        System.out.println("Join cardinalities are " + jCardList);
+
+        Assert.assertEquals(0, jCardList.get(0), .001);
+        Assert.assertEquals(0, jCardList.get(3), .001);
+        Assert.assertEquals(6.0 / 600, jCardList.get(5), .001);
+        Assert.assertEquals(6.0 / 600, jCardList.get(6), .001);
+        Assert.assertEquals(0 / 600, jCardList.get(8), .001);
+        Assert.assertEquals(6.0 / 600, jCardList.get(7), .001);
+        Assert.assertEquals(15.0 / 600, jCardList.get(11), .001);
+        Assert.assertEquals(6.0 / 600, jCardList.get(13), .001);
+        Assert.assertEquals(10.0 / 600, jCardList.get(15), .001);
+
+        Assert.assertTrue(jCardList.get(0) == 0);
+        Assert.assertTrue(jCardList.get(3) == 0);
+        Assert.assertTrue(jCardList.get(5) == .01);
+        Assert.assertTrue(jCardList.get(6) == .01);
+        Assert.assertTrue(jCardList.get(8) == 0);
+        Assert.assertTrue(jCardList.get(7) == (6.0 / 600));
+        Assert.assertTrue(jCardList.get(11) == (1.0 / 40));
+        Assert.assertTrue(jCardList.get(13) == .01);
+        Assert.assertTrue(jCardList.get(15) == (10.0 / 600));
+
+    }
+
+    @Test
+    public void testJoinCardinalityQuery2() throws AccumuloException, 
AccumuloSecurityException, TableExistsException,
+            TableNotFoundException, MalformedQueryException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setConnector(conn);
+        accc.setRdfEvalDAO(res);
+        accc.init();
+
+        BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config);
+        BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config);
+
+        String s1 = "subjectobject" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label"; + DELIM + "uri:dog";
+        String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + 
"uri:cat";
+        String s3 = "subjectobject" + DELIM + "uri:peesOn" + DELIM + 
"uri:hydrant";
+        String s4 = "objectsubject" + DELIM + "uri:dog" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label";;
+        String s5 = "objectsubject" + DELIM + "uri:cat" + DELIM + 
"uri:barksAt";
+        String s6 = "objectsubject" + DELIM + "uri:hydrant" + DELIM + 
"uri:peesOn";
+        List<String> sList = Arrays.asList("subjectobject", 
"subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate",
+                "predicatesubject");
+        List<Mutation> mList = new ArrayList<Mutation>();
+        List<Mutation> mList2 = new ArrayList<Mutation>();
+        Mutation m1, m2, m3, m4;
+
+        m1 = new Mutation(s1 + DELIM + "1");
+        m1.put(new Text("count"), new Text(""), new Value("2".getBytes()));
+        m2 = new Mutation(s2 + DELIM + "2");
+        m2.put(new Text("count"), new Text(""), new Value("4".getBytes()));
+        m3 = new Mutation(s3 + DELIM + "3");
+        m3.put(new Text("count"), new Text(""), new Value("6".getBytes()));
+        mList.add(m1);
+        mList.add(m2);
+        mList.add(m3);
+
+        bw1.addMutations(mList);
+        bw1.close();
+
+        m1 = new Mutation(s4);
+        m2 = new Mutation(s5);
+        m3 = new Mutation(s6);
+        int i = 5;
+        int j = 6;
+        int k = 7;
+        Long count1;
+        Long count2;
+        Long count3;
+
+        for (String s : sList) {
+            count1 = (long) i;
+            count2 = (long) j;
+            count3 = (long) k;
+            m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL);
+            m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL);
+            m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL);
+            i = 2 * i;
+            j = 2 * j;
+            k = 2 * k;
+        }
+        m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m4.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL);
+        mList2.add(m1);
+        mList2.add(m2);
+        mList2.add(m3);
+        mList2.add(m4);
+        bw2.addMutations(mList2);
+        bw2.close();
+
+        List<StatementPattern> spList = getSpList(q2);
+        // System.out.println(spList);
+        List<Double> jCardList = new ArrayList<Double>();
+
+        for (StatementPattern sp1 : spList) {
+            for (StatementPattern sp2 : spList) {
+                jCardList.add(accc.getJoinSelect(arc, sp1, sp2));
+            }
+        }
+
+        System.out.println("Join cardinalities are " + jCardList);
+
+        Assert.assertEquals(0, jCardList.get(0), .001);
+        Assert.assertEquals(0, jCardList.get(3), .001);
+        Assert.assertEquals(2.0 / 600, jCardList.get(5), .001);
+        Assert.assertEquals(4.0 / 600, jCardList.get(6), .001);
+        Assert.assertEquals(.0 / 600, jCardList.get(8), .001);
+        Assert.assertEquals(6. / 600, jCardList.get(7), .001);
+        Assert.assertEquals(6. / 600, jCardList.get(11), .001);
+
+        Assert.assertTrue(jCardList.get(0) == 0);
+        Assert.assertTrue(jCardList.get(3) == 0);
+        Assert.assertTrue(jCardList.get(5) == (1.0 / 300));
+        Assert.assertTrue(jCardList.get(6) == (4.0 / 600));
+        Assert.assertTrue(jCardList.get(8) == 0);
+        Assert.assertTrue(jCardList.get(7) == .01);
+        Assert.assertTrue(jCardList.get(11) == .01);
+
+    }
+
+    @Test
+    public void testJoinCardinalityQuery3() throws AccumuloException, 
AccumuloSecurityException, TableExistsException,
+            TableNotFoundException, MalformedQueryException {
+
+        AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO();
+        accc.setConf(arc);
+        accc.setConnector(conn);
+        accc.setRdfEvalDAO(res);
+        accc.init();
+
+        BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config);
+        BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config);
+
+        String s1 = "subjectpredicate" + DELIM + 
"http://www.w3.org/2000/01/rdf-schema#label"; + DELIM + "uri:dog";
+        String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + 
"uri:cat";
+        String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + 
"uri:hydrant";
+        String s4 = "subjectpredicate" + DELIM + "uri:howlsAt" + DELIM + 
"uri:moon";
+        String s5 = "objectsubject" + DELIM + "uri:cat" + DELIM + 
"uri:barksAt";
+
+        List<String> sList = Arrays.asList("subjectobject", "objectsubject", 
"objectobject", "objectpredicate", "subjectpredicate",
+                "subjectsubject", "predicateobject", "predicatepredicate", 
"predicatesubject");
+        List<Mutation> mList = new ArrayList<Mutation>();
+        List<Mutation> mList2 = new ArrayList<Mutation>();
+        Mutation m1, m2, m3, m4, m5, m6;
+
+        m1 = new Mutation(s1 + DELIM + "1");
+        m1.put(new Text("count"), new Text(""), new Value("15".getBytes()));
+        m2 = new Mutation(s2 + DELIM + "2");
+        m2.put(new Text("count"), new Text(""), new Value("11".getBytes()));
+        m3 = new Mutation(s3 + DELIM + "3");
+        m3.put(new Text("count"), new Text(""), new Value("13".getBytes()));
+        m4 = new Mutation(s4 + DELIM + "8");
+        m4.put(new Text("count"), new Text(""), new Value("20".getBytes()));
+        m5 = new Mutation(s4 + DELIM + "2");
+        m5.put(new Text("count"), new Text(""), new Value("10".getBytes()));
+
+        mList.add(m1);
+        mList.add(m2);
+        mList.add(m3);
+        mList.add(m4);
+        mList.add(m5);
+
+        bw1.addMutations(mList);
+        bw1.close();
+
+        m1 = new Mutation(s1);
+        m2 = new Mutation(s5);
+        m3 = new Mutation(s3);
+        m4 = new Mutation(s4);
+        int i = 5;
+        int j = 6;
+        int k = 7;
+        int l = 8;
+        Long count1;
+        Long count2;
+        Long count3;
+        Long count4;
+
+        for (String s : sList) {
+            count1 = (long) i;
+            count2 = (long) j;
+            count3 = (long) k;
+            count4 = (long) l;
+            m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL);
+            m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL);
+            m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL);
+            m4.put(new Text(s), new Text(count4.toString()), EMPTY_VAL);
+            i = 2 * i;
+            j = 2 * j;
+            k = 2 * k;
+            l = 2 * l;
+        }
+        m5 = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m5.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL);
+        mList2.add(m1);
+        mList2.add(m2);
+        mList2.add(m3);
+        mList2.add(m4);
+        mList2.add(m5);
+        bw2.addMutations(mList2);
+        bw2.close();
+
+        List<StatementPattern> spList = getSpList(q3);
+        System.out.println(spList);
+        List<Double> jCardList = new ArrayList<Double>();
+
+        for (StatementPattern sp1 : spList) {
+            for (StatementPattern sp2 : spList) {
+                jCardList.add(accc.getJoinSelect(arc, sp1, sp2));
+            }
+        }
+
+        MathContext mc = new MathContext(3);
+
+        Assert.assertEquals(3.2 / 600, jCardList.get(0), .001);
+        Assert.assertEquals(0.5384615384615384 / 600, jCardList.get(3), .001);
+        Assert.assertEquals(1.3333333333333333 / 600, jCardList.get(5), .001);
+        Assert.assertEquals(2.6666666666666665 / 600, jCardList.get(6), .001);
+        Assert.assertEquals(6.4 / 600, jCardList.get(8), .001);
+        Assert.assertEquals(13. / 600, jCardList.get(15), .001);
+
+        Assert.assertTrue(new 
BigDecimal(jCardList.get(2)).round(mc).equals(new BigDecimal(64.0 / 
6000).round(mc)));
+        Assert.assertTrue(new 
BigDecimal(jCardList.get(7)).round(mc).equals(new BigDecimal(7.0 / 
7800).round(mc)));
+        Assert.assertTrue(new 
BigDecimal(jCardList.get(14)).round(mc).equals(new BigDecimal(112.0 / 
7800).round(mc)));
+
+    }
+
+    private List<StatementPattern> getSpList(String query) throws 
MalformedQueryException {
+
+        SPARQLParser sp = new SPARQLParser();
+        ParsedQuery pq = sp.parseQuery(query, null);
+        TupleExpr te = pq.getTupleExpr();
+
+        return StatementPatternCollector.process(te);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java
new file mode 100644
index 0000000..eb480ad
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java
@@ -0,0 +1,140 @@
+package mvm.rya.joinselect.mr;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import mvm.rya.joinselect.mr.utils.CardList;
+import mvm.rya.joinselect.mr.utils.TripleEntry;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.junit.Test;
+
+public class CardinalityIdentityReducerTest {
+
+  private static final String DELIM = "\u0000";
+
+  @Test
+  public void testCIReducerOneConstant() throws InterruptedException, 
IOException {
+
+    TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new 
Text(""), new Text("subject"), new Text(""), new Text("object"));
+    CardList cL1 = new CardList(1, 2, 3, 0, 0, 0);
+    CardList cL2 = new CardList(4, 5, 6, 0, 0, 0);
+    CardList cl = new CardList(5, 7, 9, 0, 0, 0);
+    List<CardList> list = new ArrayList<CardList>();
+    list.add(cL1);
+    list.add(cL2);
+
+    Text row = new Text(te.getFirstPos().toString() + DELIM + 
te.getFirst().toString());
+    Mutation m1 = new Mutation(row);
+    m1.put(new Text(te.getKeyPos().toString() + "subject"), new 
Text(cl.getcardS().toString()), new Value(new byte[0]));
+    Mutation m2 = new Mutation(row);
+    m2.put(new Text(te.getKeyPos().toString() + "predicate"), new 
Text(cl.getcardP().toString()), new Value(new byte[0]));
+    Mutation m3 = new Mutation(row);
+    m3.put(new Text(te.getKeyPos().toString() + "object"), new 
Text(cl.getcardO().toString()), new Value(new byte[0]));
+    Text table = new Text("");
+
+    new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new 
JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list)
+        .withOutput(table, m1).withOutput(table, m2).withOutput(table, 
m3).runTest();
+
+  }
+
+  @Test
+  public void testCIReducerTwoConstant() throws InterruptedException, 
IOException {
+
+    TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new 
Text("urn:gem#pred"), new Text("subject"), new Text("predicate"), new 
Text("object"));
+    CardList cL1 = new CardList(1, 2, 3, 0, 0, 0);
+    CardList cL2 = new CardList(4, 5, 6, 0, 0, 0);
+    CardList cl = new CardList(5, 7, 9, 0, 0, 0);
+    List<CardList> list = new ArrayList<CardList>();
+    list.add(cL1);
+    list.add(cL2);
+
+    Text row = new Text(te.getFirstPos().toString() + 
te.getSecondPos().toString() + DELIM + te.getFirst().toString() + DELIM + 
te.getSecond());
+    Mutation m1 = new Mutation(row);
+    m1.put(new Text(te.getKeyPos().toString() + "subject"), new 
Text(cl.getcardS().toString()), new Value(new byte[0]));
+    Mutation m2 = new Mutation(row);
+    m2.put(new Text(te.getKeyPos().toString() + "predicate"), new 
Text(cl.getcardP().toString()), new Value(new byte[0]));
+    Mutation m3 = new Mutation(row);
+    m3.put(new Text(te.getKeyPos().toString() + "object"), new 
Text(cl.getcardO().toString()), new Value(new byte[0]));
+    Text table = new Text("");
+
+    new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new 
JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list)
+        .withOutput(table, m1).withOutput(table, m2).withOutput(table, 
m3).runTest();
+
+  }
+
+  @Test
+  public void testJoinTwoVars() throws InterruptedException, IOException {
+
+    TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new 
Text(""), new Text("subject"), new Text(""), new Text("predicateobject"));
+    CardList cL1 = new CardList(0, 0, 0, 1, 2, 3);
+    CardList cL2 = new CardList(0, 0, 0, 4, 5, 6);
+    CardList cl = new CardList(0, 0, 0, 5, 7, 9);
+    List<CardList> list = new ArrayList<CardList>();
+    list.add(cL1);
+    list.add(cL2);
+
+    Text row = new Text(te.getFirstPos().toString() + DELIM + 
te.getFirst().toString());
+    Mutation m1 = new Mutation(row);
+    m1.put(new Text(te.getKeyPos().toString() + "subjectpredicate"), new 
Text(cl.getcardSP().toString()), new Value(new byte[0]));
+    Mutation m2 = new Mutation(row);
+    m2.put(new Text(te.getKeyPos().toString() + "predicateobject"), new 
Text(cl.getcardPO().toString()), new Value(new byte[0]));
+    Mutation m3 = new Mutation(row);
+    m3.put(new Text(te.getKeyPos().toString() + "objectsubject"), new 
Text(cl.getcardSO().toString()), new Value(new byte[0]));
+    Text table = new Text("");
+
+    new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new 
JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list)
+        .withOutput(table, m1).withOutput(table, m2).withOutput(table, 
m3).runTest();
+
+  }
+
+  @Test
+  public void testJoinTwoVarsReverseOrder() throws InterruptedException, 
IOException {
+
+    TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new 
Text(""), new Text("subject"), new Text(""), new Text("objectpredicate"));
+    CardList cL1 = new CardList(0, 0, 0, 1, 2, 3);
+    CardList cL2 = new CardList(0, 0, 0, 4, 5, 6);
+    CardList cl = new CardList(0, 0, 0, 5, 7, 9);
+    List<CardList> list = new ArrayList<CardList>();
+    list.add(cL1);
+    list.add(cL2);
+
+    Text row = new Text(te.getFirstPos().toString() + DELIM + 
te.getFirst().toString());
+    Mutation m1 = new Mutation(row);
+    m1.put(new Text("predicateobject" + "predicatesubject"), new 
Text(cl.getcardSP().toString()), new Value(new byte[0]));
+    Mutation m2 = new Mutation(row);
+    m2.put(new Text("predicateobject" + "objectpredicate"), new 
Text(cl.getcardPO().toString()), new Value(new byte[0]));
+    Mutation m3 = new Mutation(row);
+    m3.put(new Text("predicateobject" + "subjectobject"), new 
Text(cl.getcardSO().toString()), new Value(new byte[0]));
+    Text table = new Text("");
+
+    new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new 
JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list)
+        .withOutput(table, m1).withOutput(table, m2).withOutput(table, 
m3).runTest();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java
new file mode 100644
index 0000000..8647294
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java
@@ -0,0 +1,75 @@
+package mvm.rya.joinselect.mr;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import mvm.rya.joinselect.mr.JoinSelectProspectOutput;
+import mvm.rya.joinselect.mr.utils.CardinalityType;
+import mvm.rya.joinselect.mr.utils.CompositeType;
+import mvm.rya.joinselect.mr.utils.TripleCard;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Test;
+
+public class CardinalityMapperTest {
+
+  private static final String DELIM = "\u0000";
+
+  public enum TripleValueType {
+    subject, predicate, object, subjectpredicate, predicateobject, 
subjectobject
+  }
+
+  @Test
+  public void testOutput() throws InterruptedException, IOException {
+
+    String s = "urn:gem:etype#1234";
+    String p = "urn:gem#pred";
+
+    Text t1 = new Text(TripleValueType.subject.name() + DELIM + s + DELIM + 1);
+    Text t2 = new Text(TripleValueType.predicate.name() + DELIM + p + DELIM + 
2);
+    Text t3 = new Text(TripleValueType.subjectpredicate.name() + DELIM + s + 
DELIM + p + DELIM + 3);
+
+    byte[] b = new byte[0];
+    byte[] c = "25".getBytes();
+    byte[] d = "47".getBytes();
+    byte[] e = "15".getBytes();
+
+    Key key1 = new Key(t1.getBytes(), b, b, b, 1);
+    Key key2 = new Key(t2.getBytes(), b, b, b, 1);
+    Key key3 = new Key(t3.getBytes(), b, b, b, 1);
+    Value val1 = new Value(c);
+    Value val2 = new Value(d);
+    Value val3 = new Value(e);
+
+    // System.out.println("Keys are " + key1 + " and " + key2);
+
+    new MapDriver<Key,Value,CompositeType,TripleCard>().withMapper(new 
JoinSelectProspectOutput.CardinalityMapper()).withInput(key1, val1)
+        .withInput(key2, val2).withInput(key3, val3).withOutput(new 
CompositeType(s, 1), new TripleCard(new CardinalityType(25, "subject", 1)))
+        .withOutput(new CompositeType(p, 1), new TripleCard(new 
CardinalityType(47, "predicate", 2)))
+        .withOutput(new CompositeType(s + DELIM + p, 1), new TripleCard(new 
CardinalityType(15, "subjectpredicate", 3))).runTest();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java
new file mode 100644
index 0000000..c070488
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java
@@ -0,0 +1,63 @@
+package mvm.rya.joinselect.mr;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import mvm.rya.joinselect.mr.FullTableSize;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.junit.Test;
+
+//TODO fix table names!
+
+public class FullTableSizeTest {
+
+    private static final String DELIM = "\u0000";
+
+    @Test
+    public void testFullTableSize() throws IOException {
+
+        Value value = new Value(new byte[0]);
+
+        Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + 
"FullTableCardinality"));
+        m.put(new Text("FullTableCardinality"), new Text("15"), new Value(new 
byte[0]));
+
+        new MapReduceDriver<Key, Value, Text, IntWritable, Text, Mutation>()
+                .withMapper(new FullTableSize.FullTableMapper()).withInput(new 
Key(new Text("entry1")), value)
+                .withInput(new Key(new Text("entry2")), value).withInput(new 
Key(new Text("entry3")), value)
+                .withInput(new Key(new Text("entry4")), value).withInput(new 
Key(new Text("entry5")), value)
+                .withInput(new Key(new Text("entry6")), value).withInput(new 
Key(new Text("entry7")), value)
+                .withInput(new Key(new Text("entry8")), value).withInput(new 
Key(new Text("entry9")), value)
+                .withInput(new Key(new Text("entry10")), value).withInput(new 
Key(new Text("entry11")), value)
+                .withInput(new Key(new Text("entry12")), value).withInput(new 
Key(new Text("entry13")), value)
+                .withInput(new Key(new Text("entry14")), value).withInput(new 
Key(new Text("entry15")), value)
+                .withCombiner(new 
FullTableSize.FullTableCombiner()).withReducer(new 
FullTableSize.FullTableReducer())
+                .withOutput(new Text(""), m).runTest();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java
new file mode 100644
index 0000000..5d2acb7
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java
@@ -0,0 +1,123 @@
+package mvm.rya.joinselect.mr;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import mvm.rya.joinselect.mr.JoinSelectAggregate;
+import mvm.rya.joinselect.mr.utils.CardList;
+import mvm.rya.joinselect.mr.utils.CardinalityType;
+import mvm.rya.joinselect.mr.utils.CompositeType;
+import mvm.rya.joinselect.mr.utils.TripleCard;
+import mvm.rya.joinselect.mr.utils.TripleEntry;
+
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.junit.Test;
+
+public class JoinReducerTest {
+
+  private static final String DELIM = "\u0000";
+
+  @Test
+  public void testSingleConstCard() throws InterruptedException, IOException {
+
+    CompositeType ct = new CompositeType("urn:gem:etype#1234", 1);
+    TripleEntry te = new TripleEntry("urn:gem#pred", "urn:gem:etype#4567", 
"predicate", "object", "subject");
+    CardinalityType c5 = new CardinalityType(45, "object", 0);
+    CardinalityType c1 = new CardinalityType(25, "subject", 2);
+    CardinalityType c2 = new CardinalityType(27, "predicate", 2);
+    CardinalityType c3 = new CardinalityType(29, "object", 2);
+    CardinalityType c4 = new CardinalityType(31, "predicate", 1);
+    List<TripleCard> list = new ArrayList<TripleCard>();
+    list.add(new TripleCard(c1));
+    list.add(new TripleCard(c2));
+    list.add(new TripleCard(c3));
+    list.add(new TripleCard(c4));
+    list.add(new TripleCard(c5));
+    list.add(new TripleCard(te));
+    System.out.println("List is " + list);
+
+    new 
ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new 
JoinSelectAggregate.JoinReducer()).withInput(ct, list)
+        .withOutput(te, new CardList(25, 31, 45, 0, 0, 0)).runTest();
+
+  }
+
+  @Test
+  public void testTwoTripleEntry() throws InterruptedException, IOException {
+
+    CompositeType ct = new CompositeType("urn:gem:etype#1234", 1);
+    TripleEntry te1 = new TripleEntry("urn:gem#pred", "urn:gem:etype#4567", 
"predicate", "object", "subject");
+    TripleEntry te2 = new TripleEntry("urn:gem#8910", "urn:gem:etype#4567", 
"subject", "predicate", "object");
+    CardinalityType c5 = new CardinalityType(45, "object", 0);
+    CardinalityType c1 = new CardinalityType(25, "subject", 2);
+    CardinalityType c2 = new CardinalityType(27, "predicate", 2);
+    CardinalityType c3 = new CardinalityType(29, "object", 2);
+    CardinalityType c4 = new CardinalityType(31, "predicate", 1);
+    List<TripleCard> list = new ArrayList<TripleCard>();
+    list.add(new TripleCard(c1));
+    list.add(new TripleCard(c2));
+    list.add(new TripleCard(c3));
+    list.add(new TripleCard(c4));
+    list.add(new TripleCard(c5));
+    list.add(new TripleCard(te1));
+    list.add(new TripleCard(te2));
+    System.out.println("List is " + list);
+
+    new 
ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new 
JoinSelectAggregate.JoinReducer()).withInput(ct, list)
+        .withOutput(te1, new CardList(25, 31, 45, 0, 0, 0)).withOutput(te2, 
new CardList(25, 31, 45, 0, 0, 0)).runTest();
+
+  }
+
+  @Test
+  public void testTwoConstCard() throws InterruptedException, IOException {
+
+    CompositeType ct1 = new CompositeType("urn:gem#pred" + DELIM + 
"urn:gem:etype#1234", 1);
+    TripleEntry te1 = new TripleEntry("uri:testSubject", "", "subject", "", 
"predicateobject");
+    TripleEntry te2 = new TripleEntry("uri:testSubject", "", "subject", "", 
"objectpredicate");
+
+    CardinalityType c5 = new CardinalityType(45, "subjectobject", 0);
+    CardinalityType c1 = new CardinalityType(25, "subjectobject", 2);
+    CardinalityType c2 = new CardinalityType(27, "predicateobject", 5);
+    CardinalityType c3 = new CardinalityType(29, "predicateobject", 2);
+    CardinalityType c4 = new CardinalityType(31, "subjectpredicate", 1);
+    CardinalityType c6 = new CardinalityType(56, "subjectpredicate", 2);
+
+    List<TripleCard> list1 = new ArrayList<TripleCard>();
+
+    list1.add(new TripleCard(c1));
+    list1.add(new TripleCard(c2));
+    list1.add(new TripleCard(c3));
+    list1.add(new TripleCard(c4));
+    list1.add(new TripleCard(c5));
+    list1.add(new TripleCard(c6));
+    list1.add(new TripleCard(te1));
+    list1.add(new TripleCard(te2));
+
+    // System.out.println("List is " + list);
+
+    new 
ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new 
JoinSelectAggregate.JoinReducer()).withInput(ct1, list1)
+        .withOutput(te1, new CardList(0, 0, 0, 31, 29, 45)).withOutput(te2, 
new CardList(0, 0, 0, 31, 29, 45)).runTest();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java
new file mode 100644
index 0000000..e9281da
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java
@@ -0,0 +1,93 @@
+package mvm.rya.joinselect.mr;
+
+/*
+ * #%L
+ * mvm.rya.rya.prospector
+ * %%
+ * Copyright (C) 2014 - 2015 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.Map;
+
+import mvm.rya.joinselect.mr.JoinSelectSpoTableOutput;
+import mvm.rya.joinselect.mr.utils.CompositeType;
+import mvm.rya.joinselect.mr.utils.TripleCard;
+import mvm.rya.joinselect.mr.utils.TripleEntry;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolver;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Test;
+
+public class JoinSelectMapperTest {
+
+  private static final String DELIM = "\u0000";
+
+  @Test
+  public void testOutput() throws TripleRowResolverException, IOException {
+
+    RyaStatement rya = new RyaStatement(new RyaURI("urn:gem:etype#1234"), new 
RyaURI("urn:gem#pred"), new RyaType("mydata1"));
+    Text s = new Text(rya.getSubject().getData());
+    Text p = new Text(rya.getPredicate().getData());
+    Text o = new Text(rya.getObject().getData());
+    Text sp = new Text(rya.getSubject().getData() + DELIM + 
rya.getPredicate().getData());
+    Text so = new Text(rya.getSubject().getData() + DELIM + 
rya.getObject().getData());
+    Text po = new Text(rya.getPredicate().getData() + DELIM + 
rya.getObject().getData());
+    Text ps = new Text(rya.getPredicate().getData() + DELIM + 
rya.getSubject().getData());
+    Text op = new Text(rya.getObject().getData() + DELIM + 
rya.getPredicate().getData());
+    Text os = new Text(rya.getObject().getData() + DELIM + 
rya.getSubject().getData());
+
+    TripleEntry t1 = new TripleEntry(s, p, new Text("subject"), new 
Text("predicate"), new Text("object"));
+    TripleEntry t2 = new TripleEntry(p, o, new Text("predicate"), new 
Text("object"), new Text("subject"));
+    TripleEntry t3 = new TripleEntry(o, s, new Text("object"), new 
Text("subject"), new Text("predicate"));
+    TripleEntry t4 = new TripleEntry(o, new Text(""), new Text("object"), new 
Text(""), new Text("subjectpredicate"));
+    TripleEntry t5 = new TripleEntry(p, new Text(""), new Text("predicate"), 
new Text(""), new Text("objectsubject"));
+    TripleEntry t6 = new TripleEntry(s, new Text(""), new Text("subject"), new 
Text(""), new Text("predicateobject"));
+    TripleEntry t7 = new TripleEntry(s, new Text(""), new Text("subject"), new 
Text(""), new Text("objectpredicate"));
+    TripleEntry t8 = new TripleEntry(p, new Text(""), new Text("predicate"), 
new Text(""), new Text("subjectobject"));
+    TripleEntry t9 = new TripleEntry(o, new Text(""), new Text("object"), new 
Text(""), new Text("predicatesubject"));
+
+    TripleRowResolver trr = new WholeRowTripleResolver();
+    Map<TABLE_LAYOUT,TripleRow> map = trr.serialize(rya);
+    System.out.println(map);
+    TripleRow tr = map.get(TABLE_LAYOUT.SPO);
+    System.out.println("Triple row is" + tr);
+    System.out.println("ColumnV is " + tr.getTimestamp());
+    byte[] b = new byte[0];
+    Key key = new Key(tr.getRow(), tr.getColumnFamily(), 
tr.getColumnQualifier(), b, 1);
+    Value val = new Value(b);
+
+    new MapDriver<Key,Value,CompositeType,TripleCard>().withMapper(new 
JoinSelectSpoTableOutput.JoinSelectMapper()).withInput(key, val)
+        .withOutput(new CompositeType(o, new IntWritable(2)), new 
TripleCard(t1)).withOutput(new CompositeType(s, new IntWritable(2)), new 
TripleCard(t2))
+        .withOutput(new CompositeType(p, new IntWritable(2)), new 
TripleCard(t3)).withOutput(new CompositeType(po, new IntWritable(2)), new 
TripleCard(t6))
+        .withOutput(new CompositeType(so, new IntWritable(2)), new 
TripleCard(t5)).withOutput(new CompositeType(sp, new IntWritable(2)), new 
TripleCard(t4))
+        .withOutput(new CompositeType(op, new IntWritable(2)), new 
TripleCard(t7)).withOutput(new CompositeType(os, new IntWritable(2)), new 
TripleCard(t8))
+        .withOutput(new CompositeType(ps, new IntWritable(2)), new 
TripleCard(t9)).runTest();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java
 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java
new file mode 100644
index 0000000..c00f8ff
--- /dev/null
+++ 
b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java
@@ -0,0 +1,69 @@
+package mvm.rya.joinselect.mr;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import mvm.rya.joinselect.mr.JoinSelectProspectOutput;
+import mvm.rya.joinselect.mr.utils.CardinalityType;
+import mvm.rya.joinselect.mr.utils.CompositeType;
+import mvm.rya.joinselect.mr.utils.TripleCard;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.junit.Test;
+
+public class JoinSelectProspectOutputTest {
+
+    private static final String DELIM = "\u0000";
+
+    public enum TripleValueType {
+        subject, predicate, object, subjectpredicate, predicateobject, 
subjectobject
+    }
+
+    @Test
+    public void testOutput() throws InterruptedException, IOException {
+
+        String s = "urn:gem:etype#1234";
+        String p = "urn:gem#pred";
+
+        String ts = "798497748386999999";
+        
+        Text t1 = new Text(TripleValueType.subject.name() + DELIM + s + DELIM 
+ 1);
+        Text t2 = new Text(TripleValueType.predicate.name() + DELIM + p + 
DELIM + 2);
+        Text t3 = new Text(TripleValueType.subjectpredicate.name() + DELIM + s 
+ DELIM + p + DELIM + ts);
+
+        byte[] b = new byte[0];
+        byte[] c = "25".getBytes();
+        byte[] d = "47".getBytes();
+        byte[] e = "15".getBytes();
+
+        Key key1 = new Key(t1.getBytes(), b, b, b, 1);
+        Key key2 = new Key(t2.getBytes(), b, b, b, 1);
+        Key key3 = new Key(t3.getBytes(), b, b, b, 1);
+        Value val1 = new Value(c);
+        Value val2 = new Value(d);
+        Value val3 = new Value(e);
+        
+       
+
+        // System.out.println("Keys are " + key1 + " and " + key2);
+
+        new MapDriver<Key, Value, CompositeType, TripleCard>()
+                .withMapper(new JoinSelectProspectOutput.CardinalityMapper())
+                .withInput(key1, val1)
+                .withInput(key2, val2)
+                .withInput(key3, val3)
+                .withOutput(new CompositeType(s, 1), new TripleCard(new 
CardinalityType(25, "subject", 1)))
+                .withOutput(new CompositeType(p, 1), new TripleCard(new 
CardinalityType(47, "predicate", 2)))
+                .withOutput(new CompositeType(s + DELIM + p, 1),
+                        new TripleCard(new CardinalityType(15, 
"subjectpredicate", Long.parseLong(ts)))).runTest();
+
+    }
+
+}

Reply via email to