http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java new file mode 100644 index 0000000..59e7611 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectStatsUtil.java @@ -0,0 +1,182 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.INSTANCE; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PASSWORD; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.USERNAME; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.ZOOKEEPERS; + +import java.io.IOException; + +import mvm.rya.api.resolver.triple.TripleRow; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + +public class JoinSelectStatsUtil { + + public static void initSumMRJob(Job job, String inputPath, String outtable, String auths) throws AccumuloSecurityException, IOException { + Configuration conf = job.getConfiguration(); + String username = conf.get(USERNAME); + String password = conf.get(PASSWORD); + String instance = conf.get(INSTANCE); + String zookeepers = conf.get(ZOOKEEPERS); + + if (zookeepers != null) { + AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password)); + AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers); + } else { + throw new IllegalArgumentException("Must specify zookeepers"); + } + + SequenceFileInputFormat.addInputPath(job, new Path(inputPath)); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(TripleEntry.class); + job.setMapOutputValueClass(CardList.class); + + AccumuloOutputFormat.setDefaultTableName(job, outtable); + job.setOutputFormatClass(AccumuloOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + } + + public static void initTableMRJob(Job job, String intable, String outtable, String auths) throws AccumuloSecurityException { + Configuration conf = job.getConfiguration(); + String username = conf.get(USERNAME); + String password = conf.get(PASSWORD); + String instance = conf.get(INSTANCE); + String zookeepers = conf.get(ZOOKEEPERS); + + System.out.println("Zookeepers are " + auths); + + if (zookeepers != null) { + AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers); + AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers); + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + + AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password)); + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths)); + AccumuloInputFormat.setInputTableName(job, intable); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + + // OUTPUT + AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password)); + AccumuloOutputFormat.setDefaultTableName(job, outtable); + job.setOutputFormatClass(AccumuloOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + } + + public static void initTabToSeqFileJob(Job job, String intable, String outpath, String auths) throws AccumuloSecurityException { + + Configuration conf = job.getConfiguration(); + String username = conf.get(USERNAME); + String password = conf.get(PASSWORD); + String instance = conf.get(INSTANCE); + String zookeepers = conf.get(ZOOKEEPERS); + + System.out.println("Zookeepers are " + auths); + + if (zookeepers != null) { + AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers); + } else { + throw new IllegalArgumentException("Must specify either mock or zookeepers"); + } + + AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password)); + AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths)); + AccumuloInputFormat.setInputTableName(job, intable); + job.setInputFormatClass(AccumuloInputFormat.class); + job.setMapOutputKeyClass(CompositeType.class); + job.setMapOutputValueClass(TripleCard.class); + + // OUTPUT + SequenceFileOutputFormat.setOutputPath(job, new Path(outpath)); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(CompositeType.class); + job.setOutputValueClass(TripleCard.class); + + } + + public static void initJoinMRJob(Job job, String prospectsPath, String spoPath, Class<? extends Mapper<CompositeType,TripleCard,?,?>> mapperClass, + String outPath, String auths) throws AccumuloSecurityException { + + MultipleInputs.addInputPath(job, new Path(prospectsPath), SequenceFileInputFormat.class, mapperClass); + MultipleInputs.addInputPath(job, new Path(spoPath), SequenceFileInputFormat.class, mapperClass); + job.setMapOutputKeyClass(CompositeType.class); + job.setMapOutputValueClass(TripleCard.class); + + SequenceFileOutputFormat.setOutputPath(job, new Path(outPath)); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(TripleEntry.class); + job.setOutputValueClass(CardList.class); + + } + + public static Mutation createMutation(TripleRow tripleRow) { + Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + boolean hasts = timestamp != null; + timestamp = timestamp == null ? 0l : timestamp; + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + if (hasts) { + mutation.put(cfText, cqText, cv, timestamp, v); + } else { + mutation.put(cfText, cqText, cv, v); + + } + return mutation; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java new file mode 100644 index 0000000..3843600 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleCard.java @@ -0,0 +1,144 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; + +public class TripleCard implements WritableComparable<TripleCard> { + + private CardinalityType card = null; + private TripleEntry te = null; + + private CardinalityType tempCard = new CardinalityType(); + private TripleEntry tempTe = new TripleEntry(); + + public TripleCard() {} + + public TripleCard(CardinalityType card) { + this.setCard(card); + } + + public TripleCard(TripleEntry te) { + this.setTE(te); + } + + public void setCard(CardinalityType card) { + tempCard.set(card); + this.card = tempCard; + this.te = null; + } + + public void setTE(TripleEntry te) { + tempTe.setTE(te); + this.te = tempTe; + this.card = null; + } + + public CardinalityType getCard() { + return this.card; + } + + public TripleEntry getTE() { + return this.te; + } + + public boolean isCardNull() { + return (card == null); + } + + public boolean isTeNull() { + return (te == null); + } + + @Override + public void write(DataOutput out) throws IOException { + if (card != null) { + out.writeBoolean(true); + card.write(out); + } else { + out.writeBoolean(false); + te.write(out); + } + + } + + @Override + public void readFields(DataInput in) throws IOException { + if (in.readBoolean()) { + tempCard.readFields(in); + card = tempCard; + te = null; + } else { + tempTe.readFields(in); + te = tempTe; + card = null; + } + } + + @Override + public int hashCode() { + int result = 7; + if (card != null) { + result = result * 17 + card.hashCode(); + } else { + result = result * 17 + te.hashCode(); + } + return result; + + } + + @Override + public boolean equals(Object o) { + if (o instanceof TripleCard) { + TripleCard comp = (TripleCard) o; + if (card != null) { + return card.equals(comp.card); + } else { + return te.equals(comp.te); + } + } + return false; + } + + @Override + public String toString() { + if (card != null) { + return card.toString(); + } else { + return te.toString(); + } + } + + @Override + public int compareTo(TripleCard o) { + + if (card != null) { + return card.compareTo(o.card); + } else { + return te.compareTo(o.te); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java new file mode 100644 index 0000000..8f3769c --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/TripleEntry.java @@ -0,0 +1,179 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +public class TripleEntry implements WritableComparable<TripleEntry> { + + private Text first; + private Text second; + private Text firstPos; + private Text secondPos; + private Text keyPos; + + public TripleEntry() { + + first = new Text(); + second = new Text(); + firstPos = new Text(); + secondPos = new Text(); + keyPos = new Text(); + + } + + public TripleEntry(String first, String second, String firstPos, String secondPos, String keyPos) { + this.first = new Text(first); + this.second = new Text(second); + this.firstPos = new Text(firstPos); + this.secondPos = new Text(secondPos); + this.keyPos = new Text(keyPos); + } + + public TripleEntry(Text first, Text second, Text firstPos, Text secondPos, Text keyPos) { + this.first = first; + this.second = second; + this.firstPos = firstPos; + this.secondPos = secondPos; + this.keyPos = keyPos; + } + + public void setEntry(Text first, Text second) { + this.first = first; + this.second = second; + } + + public void setPosition(Text firstPos, Text secondPos, Text keyPos) { + this.firstPos = firstPos; + this.secondPos = secondPos; + this.keyPos = keyPos; + } + + public void setTE(TripleEntry te) { + + this.first.set(te.first); + this.second.set(te.second); + this.firstPos.set(te.firstPos); + this.secondPos.set(te.secondPos); + this.keyPos.set(te.keyPos); + + } + + public Text getFirst() { + return this.first; + } + + public Text getSecond() { + return this.second; + } + + public Text getFirstPos() { + return this.firstPos; + } + + public Text getSecondPos() { + return this.secondPos; + } + + public Text getKeyPos() { + return this.keyPos; + } + + @Override + public void write(DataOutput out) throws IOException { + first.write(out); + second.write(out); + firstPos.write(out); + secondPos.write(out); + keyPos.write(out); + + } + + @Override + public void readFields(DataInput in) throws IOException { + first.readFields(in); + second.readFields(in); + firstPos.readFields(in); + secondPos.readFields(in); + keyPos.readFields(in); + + } + + @Override + public int hashCode() { + int result = 7; + result = result * 17 + first.hashCode(); + result = result * 17 + second.hashCode(); + result = result * 17 + firstPos.hashCode(); + result = result * 17 + secondPos.hashCode(); + result = result * 17 + keyPos.hashCode(); + + return result; + + } + + @Override + public boolean equals(Object o) { + if (o instanceof TripleEntry) { + TripleEntry trip = (TripleEntry) o; + return first.equals(trip.first) && second.equals(trip.second) && firstPos.equals(trip.firstPos) && secondPos.equals(trip.secondPos) + && keyPos.equals(trip.keyPos); + + } + return false; + } + + @Override + public String toString() { + return first + "\t" + firstPos + "\t" + second + "\t" + secondPos + "\t" + keyPos; + + } + + @Override + public int compareTo(TripleEntry o) { + + int cmp = first.compareTo(o.first); + if (cmp != 0) { + return cmp; + } + cmp = firstPos.compareTo(o.firstPos); + if (cmp != 0) { + return cmp; + } + cmp = second.compareTo(o.second); + if (cmp != 0) { + return cmp; + } + + cmp = secondPos.compareTo(o.secondPos); + if (cmp != 0) { + return cmp; + } + return keyPos.compareTo(o.keyPos); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan b/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan new file mode 100644 index 0000000..38258c1 --- /dev/null +++ b/extras/rya.prospector/src/main/resources/META-INF/services/mvm.rya.prospector.plans.IndexWorkPlan @@ -0,0 +1 @@ +mvm.rya.prospector.plans.impl.CountPlan \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy new file mode 100644 index 0000000..f3ef96e --- /dev/null +++ b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/mr/ProspectorTest.groovy @@ -0,0 +1,178 @@ +package mvm.rya.prospector.mr + +import com.google.common.collect.Iterators +import com.google.common.collect.Lists +import mvm.rya.accumulo.AccumuloRyaDAO +import mvm.rya.accumulo.AccumuloRdfConfiguration +import mvm.rya.api.persist.RdfEvalStatsDAO +import mvm.rya.api.domain.RyaStatement +import mvm.rya.api.domain.RyaType +import mvm.rya.api.domain.RyaURI +import mvm.rya.prospector.domain.IndexEntry +import mvm.rya.prospector.domain.TripleValueType +import mvm.rya.prospector.service.ProspectorService +import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO +import mvm.rya.prospector.utils.ProspectorConstants +import org.apache.accumulo.core.client.Instance +import org.apache.accumulo.core.client.mock.MockInstance +import org.apache.accumulo.core.security.Authorizations +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.ToolRunner +import org.junit.Test +import org.openrdf.model.vocabulary.XMLSchema +import org.openrdf.model.impl.URIImpl + +import static org.junit.Assert.assertEquals +import org.openrdf.model.impl.LiteralImpl +import org.openrdf.model.Value + +/** + * Date: 12/4/12 + * Time: 4:33 PM + */ +class ProspectorTest { + + @Test + public void testCount() throws Exception { + + Instance mock = new MockInstance("accumulo"); + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + def connector = mock.getConnector("user", "pass".bytes) + def intable = "rya_spo" + def outtable = "rya_prospects" + if (connector.tableOperations().exists(outtable)) + connector.tableOperations().delete(outtable) + connector.tableOperations().create(outtable) + + AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); + ryaDAO.setConnector(connector); + ryaDAO.init() + + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata1"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata2"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred1"), new RyaType("12"))) + + def confFile = "stats_cluster_config.xml" + def confPath = new Path(getClass().getClassLoader().getResource(confFile).toString()) + def args = (String[]) [confPath]; + ToolRunner.run(new Prospector(), args); + debugTable(connector, outtable) + + def scanner = connector.createScanner(outtable, new Authorizations("U", "FOUO")) + def iter = scanner.iterator() +// assertEquals(11, Iterators.size(iter)) + + ryaDAO.destroy() + + def conf = new Configuration() + conf.addResource(confPath) + // debugTable(mrInfo, outtable) + + def service = new ProspectorService(connector, outtable) + def auths = (String[]) ["U", "FOUO"] + def prospects = service.getProspects(auths) + def plist = Lists.newArrayList(prospects) + assertEquals(1, plist.size()) + + def rdfConf = new AccumuloRdfConfiguration(conf) + rdfConf.setAuths("U","FOUO") + + prospects = service.getProspectsInRange(System.currentTimeMillis() - 100000, System.currentTimeMillis() + 10000, auths) + plist = Lists.newArrayList(prospects) + assertEquals(1, plist.size()) + + List<String> queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem:etype"); + def query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.entity.name(), queryTerms, XMLSchema.ANYURI.stringValue(), auths) + assertEquals(1, query.size()) +// assertEquals( +// new IndexEntry(index: ProspectorConstants.COUNT, data: "urn:gem:etype", dataType: XMLSchema.ANYURI.stringValue(), +// tripleValueType: TripleValueType.entity, visibility: "", count: -1, timestamp: plist.get(0)), +// query.get(0)) + + queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem:etype#1234"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.subject.name(), queryTerms, XMLSchema.ANYURI.stringValue(), auths) + assertEquals(1, query.size()) + + queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem#pred"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.predicate.name(), queryTerms, XMLSchema.ANYURI.stringValue(), auths) + assertEquals(1, query.size()) + assertEquals( + new IndexEntry(index: ProspectorConstants.COUNT, data: "urn:gem#pred", dataType: XMLSchema.ANYURI.stringValue(), + tripleValueType: TripleValueType.predicate, visibility: "", count: 4l, timestamp: plist.get(0)), + query.get(0)) + + queryTerms = new ArrayList<String>(); + queryTerms.add("mydata1"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.object.name(), queryTerms, XMLSchema.STRING.stringValue(), auths) + assertEquals(1, query.size()) +// assertEquals( +// new IndexEntry(index: ProspectorConstants.COUNT, data: "mydata1", dataType: XMLSchema.STRING.stringValue(), +// tripleValueType: TripleValueType.object, visibility: "", count: -1, timestamp: plist.get(0)), +// query.get(0)) + + queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem:etype#1234"); + queryTerms.add("urn:gem#pred"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.subjectpredicate.name(), queryTerms, XMLSchema.STRING.stringValue(), auths) + assertEquals(1, query.size()) +// assertEquals( +// new IndexEntry(index: ProspectorConstants.COUNT, data: "urn:gem:etype#1234" + "\u0000" + "urn:gem#pred", dataType: XMLSchema.STRING.stringValue(), +// tripleValueType: TripleValueType.subjectpredicate, visibility: "", count: -1, timestamp: plist.get(0)), +// query.get(0)) + + queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem#pred"); + queryTerms.add("12"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.predicateobject.name(), queryTerms, XMLSchema.STRING.stringValue(), auths) + assertEquals(1, query.size()) +// assertEquals( +// new IndexEntry(index: ProspectorConstants.COUNT, data: "urn:gem#pred" + "\u0000" + "12", dataType: XMLSchema.STRING.stringValue(), +// tripleValueType: TripleValueType.predicateobject, visibility: "", count: -1, timestamp: plist.get(0)), +// query.get(0)) + + queryTerms = new ArrayList<String>(); + queryTerms.add("urn:gem:etype#1234"); + queryTerms.add("mydata1"); + query = service.query(plist, ProspectorConstants.COUNT, TripleValueType.subjectobject.name(), queryTerms, XMLSchema.STRING.stringValue(), auths) + + assertEquals(1, query.size()) +// assertEquals( +// new IndexEntry(index: ProspectorConstants.COUNT, data: "urn:gem:etype#1234" + "\u0000" + "mydata1", dataType: XMLSchema.STRING.stringValue(), +// tripleValueType: TripleValueType.subjectobject, visibility: "", count: -1, timestamp: plist.get(0)), +// query.get(0)) + + //should be in a teardown method + connector.tableOperations().delete(outtable) + } + + private void debugTable(def connector, String table) { + connector.createScanner(table, new Authorizations((String[]) ["U", "FOUO"])).iterator().each { + println it + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy new file mode 100644 index 0000000..275c6d5 --- /dev/null +++ b/extras/rya.prospector/src/test/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAOTest.groovy @@ -0,0 +1,182 @@ +package mvm.rya.prospector.service + +import com.google.common.collect.Iterators +import mvm.rya.accumulo.AccumuloRdfConfiguration +import mvm.rya.accumulo.AccumuloRyaDAO +import mvm.rya.api.domain.RyaStatement +import mvm.rya.api.domain.RyaType +import mvm.rya.api.domain.RyaURI +import mvm.rya.api.persist.RdfEvalStatsDAO +import mvm.rya.prospector.mr.Prospector +import org.apache.accumulo.core.client.Instance +import org.apache.accumulo.core.client.mock.MockInstance +import org.apache.accumulo.core.security.Authorizations +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.ToolRunner +import org.junit.Test +import org.openrdf.model.impl.URIImpl +import org.openrdf.model.vocabulary.XMLSchema + +import static org.junit.Assert.assertEquals +import org.openrdf.model.impl.LiteralImpl +import org.openrdf.model.Value + +/** + * Date: 1/26/13 + * Time: 3:00 PM + */ +class ProspectorServiceEvalStatsDAOTest { + + @Test + public void testCount() throws Exception { + + Instance mock = new MockInstance("accumulo"); + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + def connector = mock.getConnector("user", "pass".bytes) + def intable = "rya_spo" + def outtable = "rya_prospects" + if (connector.tableOperations().exists(outtable)) + connector.tableOperations().delete(outtable) + connector.tableOperations().create(outtable) + + AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); + ryaDAO.setConnector(connector); + ryaDAO.init() + + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata1"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata2"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred1"), new RyaType("12"))) + + def confFile = "stats_cluster_config.xml" + def confPath = new Path(getClass().getClassLoader().getResource(confFile).toString()) + def args = (String[]) [confPath]; + ToolRunner.run(new Prospector(), args); + debugTable(connector, outtable) + + def scanner = connector.createScanner(outtable, new Authorizations("U", "FOUO")) + def iter = scanner.iterator() +// assertEquals(11, Iterators.size(iter)) + + ryaDAO.destroy() + + def conf = new Configuration() + conf.addResource(confPath) +// debugTable(connector, outtable) + + def rdfConf = new AccumuloRdfConfiguration(conf) + rdfConf.setAuths("U","FOUO") + def evalDao = new ProspectorServiceEvalStatsDAO(connector, rdfConf) + evalDao.init() + + List<Value> values = new ArrayList<Value>(); + values.add( new URIImpl("urn:gem#pred")); + + def count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE, values) + assertEquals(4.0, count, 0.001); + + values = new ArrayList<Value>(); + values.add( new LiteralImpl("mydata1")); + + count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values); + assertEquals(1.0, count, 0.001); + + values = new ArrayList<Value>(); + values.add( new LiteralImpl("mydata3")); + + count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values); + assertEquals(-1.0, count, 0.001); + + //should be in a teardown method + connector.tableOperations().delete(outtable) + } + + @Test + public void testNoAuthsCount() throws Exception { + + Instance mock = new MockInstance("accumulo"); + def connector = mock.getConnector("user", "pass".bytes) + def intable = "rya_spo" + def outtable = "rya_prospects" + if (connector.tableOperations().exists(outtable)) + connector.tableOperations().delete(outtable) + connector.tableOperations().create(outtable) + connector.securityOperations().createUser("user", "pass".bytes, new Authorizations("U", "FOUO")) + + AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); + ryaDAO.setConnector(connector); + ryaDAO.init() + + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata1"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata2"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred"), new RyaType(XMLSchema.INTEGER, "12"))) + ryaDAO.add(new RyaStatement(new RyaURI("urn:gem:etype#1235"), new RyaURI("urn:gem#pred1"), new RyaType("12"))) + + def confFile = "stats_cluster_config.xml" + def confPath = new Path(getClass().getClassLoader().getResource(confFile).toString()) + def args = (String[]) [confPath]; + ToolRunner.run(new Prospector(), args); + + def scanner = connector.createScanner(outtable, new Authorizations("U", "FOUO")) + def iter = scanner.iterator() +// assertEquals(11, Iterators.size(iter)) + + ryaDAO.destroy() + + def conf = new Configuration() + conf.addResource(confPath) + + def rdfConf = new AccumuloRdfConfiguration(conf) +// rdfConf.setAuths("U","FOUO") + def evalDao = new ProspectorServiceEvalStatsDAO(connector, rdfConf) + evalDao.init() + + + List<Value> values = new ArrayList<Value>(); + values.add( new URIImpl("urn:gem#pred")); + def count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE, values) + assertEquals(4.0, count, 0.001); + + values = new ArrayList<Value>(); + values.add( new LiteralImpl("mydata1")); + count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values); + assertEquals(1.0, count, 0.001); + + values = new ArrayList<Value>(); + values.add( new LiteralImpl("mydata3")); + + count = evalDao.getCardinality(rdfConf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values); + assertEquals(-1.0, count, 0.001); + + //should be in a teardown method + connector.tableOperations().delete(outtable) + } + + private void debugTable(def connector, String table) { + connector.createScanner(table, new Authorizations((String[]) ["U", "FOUO"])).iterator().each { + println it + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java new file mode 100644 index 0000000..007af96 --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAOTest.java @@ -0,0 +1,591 @@ +package mvm.rya.joinselect; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.math.BigDecimal; +import java.math.MathContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.layout.TablePrefixLayoutStrategy; +import mvm.rya.api.persist.RdfEvalStatsDAO; +import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; +import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.helpers.StatementPatternCollector; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.collect.Lists; + +public class AccumuloSelectivityEvalDAOTest { + + private static final String DELIM = "\u0000"; + private final byte[] EMPTY_BYTE = new byte[0]; + private final Value EMPTY_VAL = new Value(EMPTY_BYTE); + + private String q1 = ""// + + "SELECT ?h " // + + "{" // + + " ?h <uri:howlsAt> <uri:moon>. "// + + " ?h <http://www.w3.org/2000/01/rdf-schema#label> <uri:dog> ."// + + " ?h <uri:barksAt> <uri:cat> ."// + + " ?h <uri:peesOn> <uri:hydrant> . "// + + "}";// + + private String q2 = ""// + + "SELECT ?h " // + + "{" // + + " <uri:howlsAt> ?h <uri:moon>. "// + + " <http://www.w3.org/2000/01/rdf-schema#label> ?h <uri:dog> ."// + + " <uri:barksAt> ?h <uri:cat> ."// + + " <uri:peesOn> ?h <uri:hydrant> . "// + + "}";// + + private String q3 = ""// + + "SELECT ?h " // + + "{" // + + " <uri:howlsAt> <uri:moon> ?h. "// + + " <http://www.w3.org/2000/01/rdf-schema#label> <uri:dog> ?h ."// + + " <uri:barksAt> ?h <uri:cat> ."// + + " ?h <uri:peesOn> <uri:hydrant> . "// + + "}";// + + private Connector conn; + AccumuloRdfConfiguration arc; + RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> res; + BatchWriterConfig config; + Instance mock; + + @Before + public void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { + + mock = new MockInstance("accumulo"); + PasswordToken pToken = new PasswordToken("pass".getBytes()); + conn = mock.getConnector("user", pToken); + + config = new BatchWriterConfig(); + config.setMaxMemory(1000); + config.setMaxLatency(1000, TimeUnit.SECONDS); + config.setMaxWriteThreads(10); + + if (conn.tableOperations().exists("rya_prospects")) { + conn.tableOperations().delete("rya_prospects"); + } + if (conn.tableOperations().exists("rya_selectivity")) { + conn.tableOperations().delete("rya_selectivity"); + } + + arc = new AccumuloRdfConfiguration(); + res = new ProspectorServiceEvalStatsDAO(conn, arc); + arc.setTableLayoutStrategy(new TablePrefixLayoutStrategy()); + arc.setMaxRangesForScanner(300); + + } + + @Test + public void testInitialize() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + TableOperations tos = conn.tableOperations(); + Assert.assertTrue(tos.exists("rya_prospects") && tos.exists("rya_selectivity")); + Assert.assertTrue(accc.isInitialized()); + Assert.assertTrue(accc.getConf().equals(arc)); + Assert.assertTrue(accc.getConnector().equals(conn)); + Assert.assertTrue(accc.getRdfEvalDAO().equals(res)); + + } + + @Test + public void testCardinalityQuery1() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, + MalformedQueryException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setRdfEvalDAO(res); + accc.setConnector(conn); + accc.init(); + + BatchWriter bw = conn.createBatchWriter("rya_prospects", config); + + BatchWriter bw1 = conn.createBatchWriter("rya_selectivity", config); + Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL); + List<Mutation> list = Lists.newArrayList(); + list.add(m); + bw1.addMutations(list); + bw1.close(); + + String s1 = "predicateobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + List<Mutation> mList = new ArrayList<Mutation>(); + Mutation m1, m2, m3; + + Integer tempInt; + Integer tempInt2; + + for (int i = 1; i < 7; i++) { + tempInt = 5 * i; + tempInt2 = 10 - i; + m1 = new Mutation(s1 + DELIM + i); + m1.put(new Text("count"), new Text(""), new Value((tempInt.toString()).getBytes())); + m2 = new Mutation(s2 + DELIM + (7 - i)); + m2.put(new Text("count"), new Text(""), new Value((tempInt.toString()).getBytes())); + m3 = new Mutation(s3 + DELIM + (10 + i)); + m3.put(new Text("count"), new Text(""), new Value((tempInt2.toString()).getBytes())); + mList.add(m1); + mList.add(m2); + mList.add(m3); + } + + bw.addMutations(mList); + bw.close(); + + List<StatementPattern> spList = getSpList(q1); + long c1 = accc.getCardinality(arc, spList.get(0)); + long c2 = accc.getCardinality(arc, spList.get(1)); + long c3 = accc.getCardinality(arc, spList.get(2)); + long c4 = accc.getCardinality(arc, spList.get(3)); + + Assert.assertTrue(c1 == (long) 0); + Assert.assertTrue(c2 == (long) 5); + Assert.assertTrue(c3 == (long) 30); + Assert.assertTrue(c4 == (long) 9); + + } + + @Test + public void testCardinalityQuery2() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, + MalformedQueryException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + BatchWriter bw = conn.createBatchWriter("rya_prospects", config); + + BatchWriter bw1 = conn.createBatchWriter("rya_selectivity", config); + Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL); + List<Mutation> list = Lists.newArrayList(); + list.add(m); + bw1.addMutations(list); + bw1.close(); + + String s1 = "subjectobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "subjectobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + List<Mutation> mList = new ArrayList<Mutation>(); + Mutation m1, m2, m3; + + Integer tempInt; + Integer tempInt2; + + for (int i = 1; i < 7; i++) { + tempInt = 5 * i; + tempInt2 = 10 - i; + m1 = new Mutation(s1 + DELIM + i); + m1.put(new Text("count"), new Text(""), new Value((tempInt.toString()).getBytes())); + m2 = new Mutation(s2 + DELIM + (7 - i)); + m2.put(new Text("count"), new Text(""), new Value((tempInt.toString()).getBytes())); + m3 = new Mutation(s3 + DELIM + (10 + i)); + m3.put(new Text("count"), new Text(""), new Value((tempInt2.toString()).getBytes())); + mList.add(m1); + mList.add(m2); + mList.add(m3); + } + bw.addMutations(mList); + bw.close(); + + List<StatementPattern> spList = getSpList(q2); + long c1 = accc.getCardinality(arc, spList.get(0)); + long c2 = accc.getCardinality(arc, spList.get(1)); + long c3 = accc.getCardinality(arc, spList.get(2)); + long c4 = accc.getCardinality(arc, spList.get(3)); + + Assert.assertTrue(c1 == (long) 0); + Assert.assertTrue(c2 == (long) 5); + Assert.assertTrue(c3 == (long) 30); + Assert.assertTrue(c4 == (long) 9); + + } + + @Test + public void testJoinCardinalityQuery1() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, MalformedQueryException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config); + BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config); + + String s1 = "predicateobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + List<Mutation> mList = new ArrayList<Mutation>(); + List<Mutation> mList2 = new ArrayList<Mutation>(); + List<String> sList = Arrays.asList("subjectobject", "subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate", + "predicatesubject"); + Mutation m1, m2, m3, m4; + + m1 = new Mutation(s1 + DELIM + "1"); + m1.put(new Text("count"), new Text(""), new Value("20".getBytes())); + m2 = new Mutation(s2 + DELIM + "2"); + m2.put(new Text("count"), new Text(""), new Value("15".getBytes())); + m3 = new Mutation(s3 + DELIM + "3"); + m3.put(new Text("count"), new Text(""), new Value("10".getBytes())); + mList.add(m1); + mList.add(m2); + mList.add(m3); + + bw1.addMutations(mList); + bw1.close(); + + m1 = new Mutation(s1); + m2 = new Mutation(s2); + m3 = new Mutation(s3); + int i = 30; + int j = 60; + int k = 90; + Long count1; + Long count2; + Long count3; + + for (String s : sList) { + count1 = (long) i; + count2 = (long) j; + count3 = (long) k; + m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL); + m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL); + m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL); + i = 2 * i; + j = 2 * j; + k = 2 * k; + } + m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m4.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL); + mList2.add(m1); + mList2.add(m2); + mList2.add(m3); + mList2.add(m4); + bw2.addMutations(mList2); + bw2.close(); + + Scanner scan = conn.createScanner("rya_selectivity", new Authorizations()); + scan.setRange(new Range()); + + for (Map.Entry<Key, Value> entry : scan) { + System.out.println("Key row string is " + entry.getKey().getRow().toString()); + System.out.println("Key is " + entry.getKey()); + System.out.println("Value is " + (new String(entry.getKey().getColumnQualifier().toString()))); + + } + + List<StatementPattern> spList = getSpList(q1); + System.out.println(spList); + List<Double> jCardList = new ArrayList<Double>(); + + for (StatementPattern sp1 : spList) { + for (StatementPattern sp2 : spList) { + jCardList.add(accc.getJoinSelect(arc, sp1, sp2)); + } + } + + System.out.println("Join cardinalities are " + jCardList); + + Assert.assertEquals(0, jCardList.get(0), .001); + Assert.assertEquals(0, jCardList.get(3), .001); + Assert.assertEquals(6.0 / 600, jCardList.get(5), .001); + Assert.assertEquals(6.0 / 600, jCardList.get(6), .001); + Assert.assertEquals(0 / 600, jCardList.get(8), .001); + Assert.assertEquals(6.0 / 600, jCardList.get(7), .001); + Assert.assertEquals(15.0 / 600, jCardList.get(11), .001); + Assert.assertEquals(6.0 / 600, jCardList.get(13), .001); + Assert.assertEquals(10.0 / 600, jCardList.get(15), .001); + + Assert.assertTrue(jCardList.get(0) == 0); + Assert.assertTrue(jCardList.get(3) == 0); + Assert.assertTrue(jCardList.get(5) == .01); + Assert.assertTrue(jCardList.get(6) == .01); + Assert.assertTrue(jCardList.get(8) == 0); + Assert.assertTrue(jCardList.get(7) == (6.0 / 600)); + Assert.assertTrue(jCardList.get(11) == (1.0 / 40)); + Assert.assertTrue(jCardList.get(13) == .01); + Assert.assertTrue(jCardList.get(15) == (10.0 / 600)); + + } + + @Test + public void testJoinCardinalityQuery2() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, MalformedQueryException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config); + BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config); + + String s1 = "subjectobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "subjectobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + String s4 = "objectsubject" + DELIM + "uri:dog" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label"; + String s5 = "objectsubject" + DELIM + "uri:cat" + DELIM + "uri:barksAt"; + String s6 = "objectsubject" + DELIM + "uri:hydrant" + DELIM + "uri:peesOn"; + List<String> sList = Arrays.asList("subjectobject", "subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate", + "predicatesubject"); + List<Mutation> mList = new ArrayList<Mutation>(); + List<Mutation> mList2 = new ArrayList<Mutation>(); + Mutation m1, m2, m3, m4; + + m1 = new Mutation(s1 + DELIM + "1"); + m1.put(new Text("count"), new Text(""), new Value("2".getBytes())); + m2 = new Mutation(s2 + DELIM + "2"); + m2.put(new Text("count"), new Text(""), new Value("4".getBytes())); + m3 = new Mutation(s3 + DELIM + "3"); + m3.put(new Text("count"), new Text(""), new Value("6".getBytes())); + mList.add(m1); + mList.add(m2); + mList.add(m3); + + bw1.addMutations(mList); + bw1.close(); + + m1 = new Mutation(s4); + m2 = new Mutation(s5); + m3 = new Mutation(s6); + int i = 5; + int j = 6; + int k = 7; + Long count1; + Long count2; + Long count3; + + for (String s : sList) { + count1 = (long) i; + count2 = (long) j; + count3 = (long) k; + m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL); + m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL); + m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL); + i = 2 * i; + j = 2 * j; + k = 2 * k; + } + m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m4.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL); + mList2.add(m1); + mList2.add(m2); + mList2.add(m3); + mList2.add(m4); + bw2.addMutations(mList2); + bw2.close(); + + List<StatementPattern> spList = getSpList(q2); + // System.out.println(spList); + List<Double> jCardList = new ArrayList<Double>(); + + for (StatementPattern sp1 : spList) { + for (StatementPattern sp2 : spList) { + jCardList.add(accc.getJoinSelect(arc, sp1, sp2)); + } + } + + System.out.println("Join cardinalities are " + jCardList); + + Assert.assertEquals(0, jCardList.get(0), .001); + Assert.assertEquals(0, jCardList.get(3), .001); + Assert.assertEquals(2.0 / 600, jCardList.get(5), .001); + Assert.assertEquals(4.0 / 600, jCardList.get(6), .001); + Assert.assertEquals(.0 / 600, jCardList.get(8), .001); + Assert.assertEquals(6. / 600, jCardList.get(7), .001); + Assert.assertEquals(6. / 600, jCardList.get(11), .001); + + Assert.assertTrue(jCardList.get(0) == 0); + Assert.assertTrue(jCardList.get(3) == 0); + Assert.assertTrue(jCardList.get(5) == (1.0 / 300)); + Assert.assertTrue(jCardList.get(6) == (4.0 / 600)); + Assert.assertTrue(jCardList.get(8) == 0); + Assert.assertTrue(jCardList.get(7) == .01); + Assert.assertTrue(jCardList.get(11) == .01); + + } + + @Test + public void testJoinCardinalityQuery3() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, MalformedQueryException { + + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config); + BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config); + + String s1 = "subjectpredicate" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "subjectobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + String s4 = "subjectpredicate" + DELIM + "uri:howlsAt" + DELIM + "uri:moon"; + String s5 = "objectsubject" + DELIM + "uri:cat" + DELIM + "uri:barksAt"; + + List<String> sList = Arrays.asList("subjectobject", "objectsubject", "objectobject", "objectpredicate", "subjectpredicate", + "subjectsubject", "predicateobject", "predicatepredicate", "predicatesubject"); + List<Mutation> mList = new ArrayList<Mutation>(); + List<Mutation> mList2 = new ArrayList<Mutation>(); + Mutation m1, m2, m3, m4, m5, m6; + + m1 = new Mutation(s1 + DELIM + "1"); + m1.put(new Text("count"), new Text(""), new Value("15".getBytes())); + m2 = new Mutation(s2 + DELIM + "2"); + m2.put(new Text("count"), new Text(""), new Value("11".getBytes())); + m3 = new Mutation(s3 + DELIM + "3"); + m3.put(new Text("count"), new Text(""), new Value("13".getBytes())); + m4 = new Mutation(s4 + DELIM + "8"); + m4.put(new Text("count"), new Text(""), new Value("20".getBytes())); + m5 = new Mutation(s4 + DELIM + "2"); + m5.put(new Text("count"), new Text(""), new Value("10".getBytes())); + + mList.add(m1); + mList.add(m2); + mList.add(m3); + mList.add(m4); + mList.add(m5); + + bw1.addMutations(mList); + bw1.close(); + + m1 = new Mutation(s1); + m2 = new Mutation(s5); + m3 = new Mutation(s3); + m4 = new Mutation(s4); + int i = 5; + int j = 6; + int k = 7; + int l = 8; + Long count1; + Long count2; + Long count3; + Long count4; + + for (String s : sList) { + count1 = (long) i; + count2 = (long) j; + count3 = (long) k; + count4 = (long) l; + m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL); + m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL); + m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL); + m4.put(new Text(s), new Text(count4.toString()), EMPTY_VAL); + i = 2 * i; + j = 2 * j; + k = 2 * k; + l = 2 * l; + } + m5 = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m5.put(new Text("FullTableCardinality"), new Text("600"), EMPTY_VAL); + mList2.add(m1); + mList2.add(m2); + mList2.add(m3); + mList2.add(m4); + mList2.add(m5); + bw2.addMutations(mList2); + bw2.close(); + + List<StatementPattern> spList = getSpList(q3); + System.out.println(spList); + List<Double> jCardList = new ArrayList<Double>(); + + for (StatementPattern sp1 : spList) { + for (StatementPattern sp2 : spList) { + jCardList.add(accc.getJoinSelect(arc, sp1, sp2)); + } + } + + MathContext mc = new MathContext(3); + + Assert.assertEquals(3.2 / 600, jCardList.get(0), .001); + Assert.assertEquals(0.5384615384615384 / 600, jCardList.get(3), .001); + Assert.assertEquals(1.3333333333333333 / 600, jCardList.get(5), .001); + Assert.assertEquals(2.6666666666666665 / 600, jCardList.get(6), .001); + Assert.assertEquals(6.4 / 600, jCardList.get(8), .001); + Assert.assertEquals(13. / 600, jCardList.get(15), .001); + + Assert.assertTrue(new BigDecimal(jCardList.get(2)).round(mc).equals(new BigDecimal(64.0 / 6000).round(mc))); + Assert.assertTrue(new BigDecimal(jCardList.get(7)).round(mc).equals(new BigDecimal(7.0 / 7800).round(mc))); + Assert.assertTrue(new BigDecimal(jCardList.get(14)).round(mc).equals(new BigDecimal(112.0 / 7800).round(mc))); + + } + + private List<StatementPattern> getSpList(String query) throws MalformedQueryException { + + SPARQLParser sp = new SPARQLParser(); + ParsedQuery pq = sp.parseQuery(query, null); + TupleExpr te = pq.getTupleExpr(); + + return StatementPatternCollector.process(te); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java new file mode 100644 index 0000000..eb480ad --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityIdentityReducerTest.java @@ -0,0 +1,140 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import mvm.rya.joinselect.mr.utils.CardList; +import mvm.rya.joinselect.mr.utils.TripleEntry; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.junit.Test; + +public class CardinalityIdentityReducerTest { + + private static final String DELIM = "\u0000"; + + @Test + public void testCIReducerOneConstant() throws InterruptedException, IOException { + + TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new Text(""), new Text("subject"), new Text(""), new Text("object")); + CardList cL1 = new CardList(1, 2, 3, 0, 0, 0); + CardList cL2 = new CardList(4, 5, 6, 0, 0, 0); + CardList cl = new CardList(5, 7, 9, 0, 0, 0); + List<CardList> list = new ArrayList<CardList>(); + list.add(cL1); + list.add(cL2); + + Text row = new Text(te.getFirstPos().toString() + DELIM + te.getFirst().toString()); + Mutation m1 = new Mutation(row); + m1.put(new Text(te.getKeyPos().toString() + "subject"), new Text(cl.getcardS().toString()), new Value(new byte[0])); + Mutation m2 = new Mutation(row); + m2.put(new Text(te.getKeyPos().toString() + "predicate"), new Text(cl.getcardP().toString()), new Value(new byte[0])); + Mutation m3 = new Mutation(row); + m3.put(new Text(te.getKeyPos().toString() + "object"), new Text(cl.getcardO().toString()), new Value(new byte[0])); + Text table = new Text(""); + + new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list) + .withOutput(table, m1).withOutput(table, m2).withOutput(table, m3).runTest(); + + } + + @Test + public void testCIReducerTwoConstant() throws InterruptedException, IOException { + + TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new Text("urn:gem#pred"), new Text("subject"), new Text("predicate"), new Text("object")); + CardList cL1 = new CardList(1, 2, 3, 0, 0, 0); + CardList cL2 = new CardList(4, 5, 6, 0, 0, 0); + CardList cl = new CardList(5, 7, 9, 0, 0, 0); + List<CardList> list = new ArrayList<CardList>(); + list.add(cL1); + list.add(cL2); + + Text row = new Text(te.getFirstPos().toString() + te.getSecondPos().toString() + DELIM + te.getFirst().toString() + DELIM + te.getSecond()); + Mutation m1 = new Mutation(row); + m1.put(new Text(te.getKeyPos().toString() + "subject"), new Text(cl.getcardS().toString()), new Value(new byte[0])); + Mutation m2 = new Mutation(row); + m2.put(new Text(te.getKeyPos().toString() + "predicate"), new Text(cl.getcardP().toString()), new Value(new byte[0])); + Mutation m3 = new Mutation(row); + m3.put(new Text(te.getKeyPos().toString() + "object"), new Text(cl.getcardO().toString()), new Value(new byte[0])); + Text table = new Text(""); + + new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list) + .withOutput(table, m1).withOutput(table, m2).withOutput(table, m3).runTest(); + + } + + @Test + public void testJoinTwoVars() throws InterruptedException, IOException { + + TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new Text(""), new Text("subject"), new Text(""), new Text("predicateobject")); + CardList cL1 = new CardList(0, 0, 0, 1, 2, 3); + CardList cL2 = new CardList(0, 0, 0, 4, 5, 6); + CardList cl = new CardList(0, 0, 0, 5, 7, 9); + List<CardList> list = new ArrayList<CardList>(); + list.add(cL1); + list.add(cL2); + + Text row = new Text(te.getFirstPos().toString() + DELIM + te.getFirst().toString()); + Mutation m1 = new Mutation(row); + m1.put(new Text(te.getKeyPos().toString() + "subjectpredicate"), new Text(cl.getcardSP().toString()), new Value(new byte[0])); + Mutation m2 = new Mutation(row); + m2.put(new Text(te.getKeyPos().toString() + "predicateobject"), new Text(cl.getcardPO().toString()), new Value(new byte[0])); + Mutation m3 = new Mutation(row); + m3.put(new Text(te.getKeyPos().toString() + "objectsubject"), new Text(cl.getcardSO().toString()), new Value(new byte[0])); + Text table = new Text(""); + + new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list) + .withOutput(table, m1).withOutput(table, m2).withOutput(table, m3).runTest(); + + } + + @Test + public void testJoinTwoVarsReverseOrder() throws InterruptedException, IOException { + + TripleEntry te = new TripleEntry(new Text("urn:gem:etype#1234"), new Text(""), new Text("subject"), new Text(""), new Text("objectpredicate")); + CardList cL1 = new CardList(0, 0, 0, 1, 2, 3); + CardList cL2 = new CardList(0, 0, 0, 4, 5, 6); + CardList cl = new CardList(0, 0, 0, 5, 7, 9); + List<CardList> list = new ArrayList<CardList>(); + list.add(cL1); + list.add(cL2); + + Text row = new Text(te.getFirstPos().toString() + DELIM + te.getFirst().toString()); + Mutation m1 = new Mutation(row); + m1.put(new Text("predicateobject" + "predicatesubject"), new Text(cl.getcardSP().toString()), new Value(new byte[0])); + Mutation m2 = new Mutation(row); + m2.put(new Text("predicateobject" + "objectpredicate"), new Text(cl.getcardPO().toString()), new Value(new byte[0])); + Mutation m3 = new Mutation(row); + m3.put(new Text("predicateobject" + "subjectobject"), new Text(cl.getcardSO().toString()), new Value(new byte[0])); + Text table = new Text(""); + + new ReduceDriver<TripleEntry,CardList,Text,Mutation>().withReducer(new JoinSelectStatisticsSum.CardinalityIdentityReducer()).withInput(te, list) + .withOutput(table, m1).withOutput(table, m2).withOutput(table, m3).runTest(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java new file mode 100644 index 0000000..8647294 --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/CardinalityMapperTest.java @@ -0,0 +1,75 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import mvm.rya.joinselect.mr.JoinSelectProspectOutput; +import mvm.rya.joinselect.mr.utils.CardinalityType; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.TripleCard; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.junit.Test; + +public class CardinalityMapperTest { + + private static final String DELIM = "\u0000"; + + public enum TripleValueType { + subject, predicate, object, subjectpredicate, predicateobject, subjectobject + } + + @Test + public void testOutput() throws InterruptedException, IOException { + + String s = "urn:gem:etype#1234"; + String p = "urn:gem#pred"; + + Text t1 = new Text(TripleValueType.subject.name() + DELIM + s + DELIM + 1); + Text t2 = new Text(TripleValueType.predicate.name() + DELIM + p + DELIM + 2); + Text t3 = new Text(TripleValueType.subjectpredicate.name() + DELIM + s + DELIM + p + DELIM + 3); + + byte[] b = new byte[0]; + byte[] c = "25".getBytes(); + byte[] d = "47".getBytes(); + byte[] e = "15".getBytes(); + + Key key1 = new Key(t1.getBytes(), b, b, b, 1); + Key key2 = new Key(t2.getBytes(), b, b, b, 1); + Key key3 = new Key(t3.getBytes(), b, b, b, 1); + Value val1 = new Value(c); + Value val2 = new Value(d); + Value val3 = new Value(e); + + // System.out.println("Keys are " + key1 + " and " + key2); + + new MapDriver<Key,Value,CompositeType,TripleCard>().withMapper(new JoinSelectProspectOutput.CardinalityMapper()).withInput(key1, val1) + .withInput(key2, val2).withInput(key3, val3).withOutput(new CompositeType(s, 1), new TripleCard(new CardinalityType(25, "subject", 1))) + .withOutput(new CompositeType(p, 1), new TripleCard(new CardinalityType(47, "predicate", 2))) + .withOutput(new CompositeType(s + DELIM + p, 1), new TripleCard(new CardinalityType(15, "subjectpredicate", 3))).runTest(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java new file mode 100644 index 0000000..c070488 --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/FullTableSizeTest.java @@ -0,0 +1,63 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import mvm.rya.joinselect.mr.FullTableSize; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.junit.Test; + +//TODO fix table names! + +public class FullTableSizeTest { + + private static final String DELIM = "\u0000"; + + @Test + public void testFullTableSize() throws IOException { + + Value value = new Value(new byte[0]); + + Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m.put(new Text("FullTableCardinality"), new Text("15"), new Value(new byte[0])); + + new MapReduceDriver<Key, Value, Text, IntWritable, Text, Mutation>() + .withMapper(new FullTableSize.FullTableMapper()).withInput(new Key(new Text("entry1")), value) + .withInput(new Key(new Text("entry2")), value).withInput(new Key(new Text("entry3")), value) + .withInput(new Key(new Text("entry4")), value).withInput(new Key(new Text("entry5")), value) + .withInput(new Key(new Text("entry6")), value).withInput(new Key(new Text("entry7")), value) + .withInput(new Key(new Text("entry8")), value).withInput(new Key(new Text("entry9")), value) + .withInput(new Key(new Text("entry10")), value).withInput(new Key(new Text("entry11")), value) + .withInput(new Key(new Text("entry12")), value).withInput(new Key(new Text("entry13")), value) + .withInput(new Key(new Text("entry14")), value).withInput(new Key(new Text("entry15")), value) + .withCombiner(new FullTableSize.FullTableCombiner()).withReducer(new FullTableSize.FullTableReducer()) + .withOutput(new Text(""), m).runTest(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java new file mode 100644 index 0000000..5d2acb7 --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinReducerTest.java @@ -0,0 +1,123 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import mvm.rya.joinselect.mr.JoinSelectAggregate; +import mvm.rya.joinselect.mr.utils.CardList; +import mvm.rya.joinselect.mr.utils.CardinalityType; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.TripleCard; +import mvm.rya.joinselect.mr.utils.TripleEntry; + +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.junit.Test; + +public class JoinReducerTest { + + private static final String DELIM = "\u0000"; + + @Test + public void testSingleConstCard() throws InterruptedException, IOException { + + CompositeType ct = new CompositeType("urn:gem:etype#1234", 1); + TripleEntry te = new TripleEntry("urn:gem#pred", "urn:gem:etype#4567", "predicate", "object", "subject"); + CardinalityType c5 = new CardinalityType(45, "object", 0); + CardinalityType c1 = new CardinalityType(25, "subject", 2); + CardinalityType c2 = new CardinalityType(27, "predicate", 2); + CardinalityType c3 = new CardinalityType(29, "object", 2); + CardinalityType c4 = new CardinalityType(31, "predicate", 1); + List<TripleCard> list = new ArrayList<TripleCard>(); + list.add(new TripleCard(c1)); + list.add(new TripleCard(c2)); + list.add(new TripleCard(c3)); + list.add(new TripleCard(c4)); + list.add(new TripleCard(c5)); + list.add(new TripleCard(te)); + System.out.println("List is " + list); + + new ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new JoinSelectAggregate.JoinReducer()).withInput(ct, list) + .withOutput(te, new CardList(25, 31, 45, 0, 0, 0)).runTest(); + + } + + @Test + public void testTwoTripleEntry() throws InterruptedException, IOException { + + CompositeType ct = new CompositeType("urn:gem:etype#1234", 1); + TripleEntry te1 = new TripleEntry("urn:gem#pred", "urn:gem:etype#4567", "predicate", "object", "subject"); + TripleEntry te2 = new TripleEntry("urn:gem#8910", "urn:gem:etype#4567", "subject", "predicate", "object"); + CardinalityType c5 = new CardinalityType(45, "object", 0); + CardinalityType c1 = new CardinalityType(25, "subject", 2); + CardinalityType c2 = new CardinalityType(27, "predicate", 2); + CardinalityType c3 = new CardinalityType(29, "object", 2); + CardinalityType c4 = new CardinalityType(31, "predicate", 1); + List<TripleCard> list = new ArrayList<TripleCard>(); + list.add(new TripleCard(c1)); + list.add(new TripleCard(c2)); + list.add(new TripleCard(c3)); + list.add(new TripleCard(c4)); + list.add(new TripleCard(c5)); + list.add(new TripleCard(te1)); + list.add(new TripleCard(te2)); + System.out.println("List is " + list); + + new ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new JoinSelectAggregate.JoinReducer()).withInput(ct, list) + .withOutput(te1, new CardList(25, 31, 45, 0, 0, 0)).withOutput(te2, new CardList(25, 31, 45, 0, 0, 0)).runTest(); + + } + + @Test + public void testTwoConstCard() throws InterruptedException, IOException { + + CompositeType ct1 = new CompositeType("urn:gem#pred" + DELIM + "urn:gem:etype#1234", 1); + TripleEntry te1 = new TripleEntry("uri:testSubject", "", "subject", "", "predicateobject"); + TripleEntry te2 = new TripleEntry("uri:testSubject", "", "subject", "", "objectpredicate"); + + CardinalityType c5 = new CardinalityType(45, "subjectobject", 0); + CardinalityType c1 = new CardinalityType(25, "subjectobject", 2); + CardinalityType c2 = new CardinalityType(27, "predicateobject", 5); + CardinalityType c3 = new CardinalityType(29, "predicateobject", 2); + CardinalityType c4 = new CardinalityType(31, "subjectpredicate", 1); + CardinalityType c6 = new CardinalityType(56, "subjectpredicate", 2); + + List<TripleCard> list1 = new ArrayList<TripleCard>(); + + list1.add(new TripleCard(c1)); + list1.add(new TripleCard(c2)); + list1.add(new TripleCard(c3)); + list1.add(new TripleCard(c4)); + list1.add(new TripleCard(c5)); + list1.add(new TripleCard(c6)); + list1.add(new TripleCard(te1)); + list1.add(new TripleCard(te2)); + + // System.out.println("List is " + list); + + new ReduceDriver<CompositeType,TripleCard,TripleEntry,CardList>().withReducer(new JoinSelectAggregate.JoinReducer()).withInput(ct1, list1) + .withOutput(te1, new CardList(0, 0, 0, 31, 29, 45)).withOutput(te2, new CardList(0, 0, 0, 31, 29, 45)).runTest(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java new file mode 100644 index 0000000..e9281da --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectMapperTest.java @@ -0,0 +1,93 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Map; + +import mvm.rya.joinselect.mr.JoinSelectSpoTableOutput; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.TripleCard; +import mvm.rya.joinselect.mr.utils.TripleEntry; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolver; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.junit.Test; + +public class JoinSelectMapperTest { + + private static final String DELIM = "\u0000"; + + @Test + public void testOutput() throws TripleRowResolverException, IOException { + + RyaStatement rya = new RyaStatement(new RyaURI("urn:gem:etype#1234"), new RyaURI("urn:gem#pred"), new RyaType("mydata1")); + Text s = new Text(rya.getSubject().getData()); + Text p = new Text(rya.getPredicate().getData()); + Text o = new Text(rya.getObject().getData()); + Text sp = new Text(rya.getSubject().getData() + DELIM + rya.getPredicate().getData()); + Text so = new Text(rya.getSubject().getData() + DELIM + rya.getObject().getData()); + Text po = new Text(rya.getPredicate().getData() + DELIM + rya.getObject().getData()); + Text ps = new Text(rya.getPredicate().getData() + DELIM + rya.getSubject().getData()); + Text op = new Text(rya.getObject().getData() + DELIM + rya.getPredicate().getData()); + Text os = new Text(rya.getObject().getData() + DELIM + rya.getSubject().getData()); + + TripleEntry t1 = new TripleEntry(s, p, new Text("subject"), new Text("predicate"), new Text("object")); + TripleEntry t2 = new TripleEntry(p, o, new Text("predicate"), new Text("object"), new Text("subject")); + TripleEntry t3 = new TripleEntry(o, s, new Text("object"), new Text("subject"), new Text("predicate")); + TripleEntry t4 = new TripleEntry(o, new Text(""), new Text("object"), new Text(""), new Text("subjectpredicate")); + TripleEntry t5 = new TripleEntry(p, new Text(""), new Text("predicate"), new Text(""), new Text("objectsubject")); + TripleEntry t6 = new TripleEntry(s, new Text(""), new Text("subject"), new Text(""), new Text("predicateobject")); + TripleEntry t7 = new TripleEntry(s, new Text(""), new Text("subject"), new Text(""), new Text("objectpredicate")); + TripleEntry t8 = new TripleEntry(p, new Text(""), new Text("predicate"), new Text(""), new Text("subjectobject")); + TripleEntry t9 = new TripleEntry(o, new Text(""), new Text("object"), new Text(""), new Text("predicatesubject")); + + TripleRowResolver trr = new WholeRowTripleResolver(); + Map<TABLE_LAYOUT,TripleRow> map = trr.serialize(rya); + System.out.println(map); + TripleRow tr = map.get(TABLE_LAYOUT.SPO); + System.out.println("Triple row is" + tr); + System.out.println("ColumnV is " + tr.getTimestamp()); + byte[] b = new byte[0]; + Key key = new Key(tr.getRow(), tr.getColumnFamily(), tr.getColumnQualifier(), b, 1); + Value val = new Value(b); + + new MapDriver<Key,Value,CompositeType,TripleCard>().withMapper(new JoinSelectSpoTableOutput.JoinSelectMapper()).withInput(key, val) + .withOutput(new CompositeType(o, new IntWritable(2)), new TripleCard(t1)).withOutput(new CompositeType(s, new IntWritable(2)), new TripleCard(t2)) + .withOutput(new CompositeType(p, new IntWritable(2)), new TripleCard(t3)).withOutput(new CompositeType(po, new IntWritable(2)), new TripleCard(t6)) + .withOutput(new CompositeType(so, new IntWritable(2)), new TripleCard(t5)).withOutput(new CompositeType(sp, new IntWritable(2)), new TripleCard(t4)) + .withOutput(new CompositeType(op, new IntWritable(2)), new TripleCard(t7)).withOutput(new CompositeType(os, new IntWritable(2)), new TripleCard(t8)) + .withOutput(new CompositeType(ps, new IntWritable(2)), new TripleCard(t9)).runTest(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java new file mode 100644 index 0000000..c00f8ff --- /dev/null +++ b/extras/rya.prospector/src/test/java/mvm/rya/joinselect/mr/JoinSelectProspectOutputTest.java @@ -0,0 +1,69 @@ +package mvm.rya.joinselect.mr; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import java.io.IOException; + +import mvm.rya.joinselect.mr.JoinSelectProspectOutput; +import mvm.rya.joinselect.mr.utils.CardinalityType; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.TripleCard; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.junit.Test; + +public class JoinSelectProspectOutputTest { + + private static final String DELIM = "\u0000"; + + public enum TripleValueType { + subject, predicate, object, subjectpredicate, predicateobject, subjectobject + } + + @Test + public void testOutput() throws InterruptedException, IOException { + + String s = "urn:gem:etype#1234"; + String p = "urn:gem#pred"; + + String ts = "798497748386999999"; + + Text t1 = new Text(TripleValueType.subject.name() + DELIM + s + DELIM + 1); + Text t2 = new Text(TripleValueType.predicate.name() + DELIM + p + DELIM + 2); + Text t3 = new Text(TripleValueType.subjectpredicate.name() + DELIM + s + DELIM + p + DELIM + ts); + + byte[] b = new byte[0]; + byte[] c = "25".getBytes(); + byte[] d = "47".getBytes(); + byte[] e = "15".getBytes(); + + Key key1 = new Key(t1.getBytes(), b, b, b, 1); + Key key2 = new Key(t2.getBytes(), b, b, b, 1); + Key key3 = new Key(t3.getBytes(), b, b, b, 1); + Value val1 = new Value(c); + Value val2 = new Value(d); + Value val3 = new Value(e); + + + + // System.out.println("Keys are " + key1 + " and " + key2); + + new MapDriver<Key, Value, CompositeType, TripleCard>() + .withMapper(new JoinSelectProspectOutput.CardinalityMapper()) + .withInput(key1, val1) + .withInput(key2, val2) + .withInput(key3, val3) + .withOutput(new CompositeType(s, 1), new TripleCard(new CardinalityType(25, "subject", 1))) + .withOutput(new CompositeType(p, 1), new TripleCard(new CardinalityType(47, "predicate", 2))) + .withOutput(new CompositeType(s + DELIM + p, 1), + new TripleCard(new CardinalityType(15, "subjectpredicate", Long.parseLong(ts)))).runTest(); + + } + +}
