http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java new file mode 100644 index 0000000..2551625 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java @@ -0,0 +1,639 @@ +package mvm.rya.joinselect; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfUtils; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RdfDAOException; +import mvm.rya.api.persist.RdfEvalStatsDAO; +import mvm.rya.api.persist.joinselect.SelectivityEvalDAO; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + + + + + +public class AccumuloSelectivityEvalDAO implements SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> { + + private boolean initialized = false; + private RdfCloudTripleStoreConfiguration conf; + private Connector connector; + private TableLayoutStrategy tableLayoutStrategy; + private boolean filtered = false; + private boolean denormalized = false; + private int FullTableCardinality = 0; + private static final String DELIM = "\u0000"; + private Map<String,Long> joinMap = new HashMap<String,Long>();; + private RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> resd; + + @Override + public void init() throws RdfDAOException { + try { + if (isInitialized()) { + throw new IllegalStateException("Already initialized"); + } + if (!resd.isInitialized()) { + resd.init(); + } + checkNotNull(connector); + tableLayoutStrategy = conf.getTableLayoutStrategy(); + TableOperations tos = connector.tableOperations(); + AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getSelectivity()); + AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getProspects()); + initialized = true; + } catch (Exception e) { + throw new RdfDAOException(e); + } + } + + + public AccumuloSelectivityEvalDAO() { + + } + + + public AccumuloSelectivityEvalDAO(RdfCloudTripleStoreConfiguration conf, Connector connector) { + + this.conf = conf; + this.connector = connector; + } + + public AccumuloSelectivityEvalDAO(RdfCloudTripleStoreConfiguration conf) { + + this.conf = conf; + Instance inst = new ZooKeeperInstance(conf.get("sc.cloudbase.instancename"), conf.get("sc.cloudbase.zookeepers")); + try { + this.connector = inst.getConnector(conf.get("sc.cloudbase.username"), conf.get("sc.cloudbase.password")); + } catch (AccumuloException e) { + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + e.printStackTrace(); + } + } + + @Override + public void destroy() throws RdfDAOException { + if (!isInitialized()) { + throw new IllegalStateException("Not initialized"); + } + initialized = false; + } + + @Override + public boolean isInitialized() throws RdfDAOException { + return initialized; + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + @Override + public RdfCloudTripleStoreConfiguration getConf() { + return conf; + } + + @Override + public void setConf(RdfCloudTripleStoreConfiguration conf) { + this.conf = conf; + } + + public RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> getRdfEvalDAO() { + return resd; + } + + public void setRdfEvalDAO(RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> resd) { + this.resd = resd; + } + + public void setFiltered(boolean filtered) { + this.filtered = filtered; + } + + + public void setDenormalized(boolean denormalize) { + this.denormalized = denormalize; + } + + private double getJoinSelect(RdfCloudTripleStoreConfiguration conf, StatementPattern sp1, StatementPattern sp2) throws TableNotFoundException { + + if (FullTableCardinality == 0) { + this.getTableSize(conf); + } + + Authorizations authorizations = getAuths(conf); + String row1 = CardinalityCalcUtil.getRow(sp1, true); + String row2 = CardinalityCalcUtil.getRow(sp2, true); + List<String> joinType = CardinalityCalcUtil.getJoins(sp1, sp2); + + if (joinType.size() == 0) { + return 1; + } + + if (joinType.size() == 2) { + + String cacheRow1; + String cacheRow2; + long card1 = 0; + long card2 = 0; + boolean contCard1 = false; + boolean contCard2 = false; + + cacheRow1 = row1 + DELIM + joinType.get(0); + cacheRow2 = row2 + DELIM + joinType.get(1); + + long count1 = getCardinality(conf, sp1); + long count2 = getCardinality(conf, sp2); + + if (count1 == 0 || count2 == 0) { + return 0; + } + + if (joinMap.containsKey(cacheRow1)) { + card1 = joinMap.get(cacheRow1); + contCard1 = true; + } + if (joinMap.containsKey(cacheRow2)) { + card2 = joinMap.get(cacheRow2); + contCard2 = true; + } + + if (!contCard1) { + Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); + joinScanner.setRange(Range.prefix(row1)); + + for (Map.Entry<Key,Value> entry : joinScanner) { + if (entry.getKey().getColumnFamily().toString().equals(joinType.get(0))) { + card1 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow1, card1); + // System.out.println("Card1 is " + card1); + break; + } + } + } + + if (!contCard2) { + Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); + joinScanner.setRange(Range.prefix(row2)); + for (Map.Entry<Key,Value> entry : joinScanner) { + if (entry.getKey().getColumnFamily().toString().equals(joinType.get(1))) { + card2 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow2, card2); + // System.out.println("Card2 is " + card2); + break; + } + } + + } + + if (!filtered && !denormalized) { + double temp1 = Math.min(((double) card1) / ((double) count1 * FullTableCardinality), ((double) card2) / ((double) count2 * FullTableCardinality)); + + double temp2 = Math.max((double) count1 / FullTableCardinality, (double) count2 / FullTableCardinality); + + // TODO maybe change back to original form as temp2 will rarely be less than temp1. + return Math.min(temp1, temp2); + } else if(denormalized) { + return Math.min(card1,card2); + } else { + + return Math.min(((double) card1 * count2) / ((double) count1 * FullTableCardinality * FullTableCardinality), ((double) card2 * count1) + / ((double) count2 * FullTableCardinality * FullTableCardinality)); + + } + } else { + + String cacheRow1 = row1 + DELIM + joinType.get(0); + String cacheRow2 = row1 + DELIM + joinType.get(1); + String cacheRow3 = row2 + DELIM + joinType.get(2); + String cacheRow4 = row2 + DELIM + joinType.get(3); + long card1 = 0; + long card2 = 0; + long card3 = 0; + long card4 = 0; + boolean contCard1 = false; + boolean contCard2 = false; + + long count1 = getCardinality(conf, sp1); + long count2 = getCardinality(conf, sp2); + + if (count1 == 0 || count2 == 0) { + return 0; + } + + if (joinMap.containsKey(cacheRow1) && joinMap.containsKey(cacheRow2)) { + card1 = joinMap.get(cacheRow1); + card2 = joinMap.get(cacheRow2); + contCard1 = true; + } + if (joinMap.containsKey(cacheRow3) && joinMap.containsKey(cacheRow4)) { + card3 = joinMap.get(cacheRow3); + card4 = joinMap.get(cacheRow4); + contCard2 = true; + } + + if (!contCard1) { + Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); + joinScanner.setRange(Range.prefix(row1)); + boolean found1 = false; + boolean found2 = false; + + for (Map.Entry<Key,Value> entry : joinScanner) { + + if (entry.getKey().getColumnFamily().toString().equals(joinType.get(0))) { + card1 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow1, card1); + found1 = true; + // System.out.println("Card1 is " + card1); + if (found1 && found2) { + card1 = Math.min(card1, card2); + break; + } + } else if (entry.getKey().getColumnFamily().toString().equals(joinType.get(1))) { + card2 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow2, card2); + found2 = true; + // System.out.println("Card1 is " + card1); + if (found1 && found2) { + card1 = Math.min(card1, card2); + break; + } + } + } + } + + if (!contCard2) { + Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); + joinScanner.setRange(Range.prefix(row2)); + boolean found1 = false; + boolean found2 = false; + for (Map.Entry<Key,Value> entry : joinScanner) { + if (entry.getKey().getColumnFamily().toString().equals(joinType.get(2))) { + card3 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow3, card3); + found1 = true; + // System.out.println("Card2 is " + card2); + if (found1 && found2) { + card3 = Math.min(card3, card4); + break; + } + } else if (entry.getKey().getColumnFamily().toString().equals(joinType.get(3))) { + card4 = CardinalityCalcUtil.getJCard(entry.getKey()); + joinMap.put(cacheRow4, card4); + found2 = true; + // System.out.println("Card1 is " + card1); + if (found1 && found2) { + card3 = Math.min(card3, card4); + break; + } + } + } + + } + + if (!filtered && !denormalized) { + return Math.min(((double) card1) / ((double) count1 * FullTableCardinality), ((double) card3) / ((double) count2 * FullTableCardinality)); + } else if(denormalized) { + return Math.min(card1,card3); + } else { + return Math.min(((double) card1 * count2) / ((double) count1 * FullTableCardinality * FullTableCardinality), ((double) card3 * count1) + / ((double) count2 * FullTableCardinality * FullTableCardinality)); + + } + + } + + } + + // TODO currently computes average selectivity of sp1 with each node in TupleExpr te (is this best?) + private double getSpJoinSelect(RdfCloudTripleStoreConfiguration conf, TupleExpr te, StatementPattern sp1) + throws TableNotFoundException { + + // System.out.println("Tuple is " + te + " and sp is " + sp1); + + if (te instanceof StatementPattern) { + return getJoinSelect(conf, (StatementPattern) te, sp1); + } else { + + SpExternalCollector spe = new SpExternalCollector(); + te.visit(spe); + List<QueryModelNode> espList = spe.getSpExtTup(); + + if (espList.size() == 0) { + + Set<String> tupBn = te.getAssuredBindingNames(); + Set<String> eBn = sp1.getAssuredBindingNames(); + Set<String> intersect = Sets.intersection(tupBn, eBn); + + return Math.pow(1.0 / 10000.0, intersect.size()); + + } + + double min = Double.MAX_VALUE; + double select = Double.MAX_VALUE; + + for (QueryModelNode node : espList) { + + if (node instanceof StatementPattern) + select = getJoinSelect(conf, sp1, (StatementPattern) node); + else if (node instanceof ExternalSet) { + select = getExtJoinSelect(sp1, (ExternalSet) node); + } + + if (min > select) { + min = select; + } + } + // System.out.println("Max is " + max); + return min; + } + } + + public double getJoinSelect(RdfCloudTripleStoreConfiguration conf, TupleExpr te1, TupleExpr te2) throws TableNotFoundException { + + SpExternalCollector spe = new SpExternalCollector(); + te2.visit(spe); + List<QueryModelNode> espList = spe.getSpExtTup(); + + double min = Double.MAX_VALUE; + + for (QueryModelNode node : espList) { + double select = getSelectivity(conf, te1, node); + if (min > select) { + min = select; + } + } + + return min; + } + + + + + private double getSelectivity(RdfCloudTripleStoreConfiguration conf, TupleExpr te, QueryModelNode node) throws TableNotFoundException { + + if ((node instanceof StatementPattern)) { + return getSpJoinSelect(conf, te, (StatementPattern) node); + + } else if (node instanceof ExternalSet) { + + return getExtJoinSelect(te, (ExternalSet) node); + + } else { + return 0; + } + + } + + + + + + private double getExtJoinSelect(TupleExpr te, ExternalSet eSet) { + + Set<String> tupBn = te.getAssuredBindingNames(); + Set<String> eBn = eSet.getAssuredBindingNames(); + Set<String> intersect = Sets.intersection(tupBn, eBn); + + return Math.pow(1.0 / 10000.0, intersect.size()); + + } + + + + + + + + + + + + // obtains cardinality for StatementPattern. Returns cardinality of 0 + // if no instances of constants occur in table. + // assumes composite cardinalities will be used. + @Override + public long getCardinality(RdfCloudTripleStoreConfiguration conf, StatementPattern sp) throws TableNotFoundException { + + Var subjectVar = sp.getSubjectVar(); + Resource subj = (Resource) getConstantValue(subjectVar); + Var predicateVar = sp.getPredicateVar(); + URI pred = (URI) getConstantValue(predicateVar); + Var objectVar = sp.getObjectVar(); + org.openrdf.model.Value obj = getConstantValue(objectVar); + Resource context = (Resource) getConstantValue(sp.getContextVar()); + + /** + * We put full triple scans before rdf:type because more often than not the triple scan is being joined with something else that is better than asking the + * full rdf:type of everything. + */ + double cardinality = 0; + try { + cardinality = 2*getTableSize(conf); + } catch (Exception e1) { + e1.printStackTrace(); + } + try { + if (subj != null) { + List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); + CARDINALITY_OF card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECT; + values.add(subj); + + if (pred != null) { + values.add(pred); + card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECTPREDICATE; + } else if (obj != null) { + values.add(obj); + card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECTOBJECT; + } + + double evalCard = this.getCardinality(conf, card, values, context); + // the cardinality will be -1 if there was no value found (if + // the index does not exist) + if (evalCard >= 0) { + cardinality = Math.min(cardinality, evalCard); + } else { + // TODO change this to agree with prospector + cardinality = 0; + } + } else if (pred != null) { + List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); + CARDINALITY_OF card = RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE; + values.add(pred); + + if (obj != null) { + values.add(obj); + card = RdfEvalStatsDAO.CARDINALITY_OF.PREDICATEOBJECT; + } + + double evalCard = this.getCardinality(conf, card, values, context); + if (evalCard >= 0) { + cardinality = Math.min(cardinality, evalCard); + } else { + // TODO change this to agree with prospector + cardinality = 0; + } + } else if (obj != null) { + List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); + values.add(obj); + double evalCard = this.getCardinality(conf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values, context); + if (evalCard >= 0) { + cardinality = Math.min(cardinality, evalCard); + } else { + // TODO change this to agree with prospector + cardinality = 0; + } + } else { + cardinality = getTableSize(conf); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + // TODO is this okay? + return (long) cardinality; + } + + private org.openrdf.model.Value getConstantValue(Var var) { + if (var != null) + return var.getValue(); + else + return null; + } + + public double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<org.openrdf.model.Value> val) throws RdfDAOException { + return resd.getCardinality(conf, card, val); + } + + public double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<org.openrdf.model.Value> val, Resource context) throws RdfDAOException { + + return resd.getCardinality(conf, card, val, context); + + } + + public int getTableSize(RdfCloudTripleStoreConfiguration conf) throws TableNotFoundException { + + Authorizations authorizations = getAuths(conf); + + + if (joinMap.containsKey("subjectpredicateobject" + DELIM + "FullTableCardinality")) { + FullTableCardinality = joinMap.get("subjectpredicateobject" + DELIM + "FullTableCardinality").intValue(); + return FullTableCardinality; + } + + if (FullTableCardinality == 0) { + Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); + joinScanner.setRange(Range.prefix(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality"))); + Iterator<Map.Entry<Key,Value>> iterator = joinScanner.iterator(); + if (iterator.hasNext()) { + Map.Entry<Key,Value> entry = iterator.next(); + if (entry.getKey().getColumnFamily().toString().equals("FullTableCardinality")) { + String Count = entry.getKey().getColumnQualifier().toString(); + FullTableCardinality = Integer.parseInt(Count); + } + } + if (FullTableCardinality == 0) { + throw new RuntimeException("Table does not contain full cardinality"); + } + + } + + return FullTableCardinality; + + } + + + private Authorizations getAuths(RdfCloudTripleStoreConfiguration conf) { + String[] auths = conf.getAuths(); + Authorizations authorizations = null; + if (auths == null || auths.length == 0) { + authorizations = new Authorizations(); + } else { + authorizations = new Authorizations(auths); + } + + return authorizations; + } + + + + private static class SpExternalCollector extends QueryModelVisitorBase<RuntimeException> { + + private List<QueryModelNode> eSet = Lists.newArrayList(); + + + @Override + public void meetNode(QueryModelNode node) throws RuntimeException { + if (node instanceof ExternalSet || node instanceof StatementPattern) { + eSet.add(node); + } + super.meetNode(node); + } + + public List<QueryModelNode> getSpExtTup() { + return eSet; + } + + } + + + + + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java new file mode 100644 index 0000000..f49ba13 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java @@ -0,0 +1,266 @@ +package mvm.rya.joinselect; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.data.Key; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; + +public class CardinalityCalcUtil { + + private static final String DELIM = "\u0000"; + + private static String intToTriplePlace(int i) { + + int place = i; + + switch (place) { + + case 0: + return "subject"; + + case 1: + return "predicate"; + + case 2: + return "object"; + + default: + throw new IllegalArgumentException("Invalid integer triple place."); + + } + + } + + private static int triplePlaceToInt(String s) { + + if (s.equals("subject")) { + return 0; + } else if (s.equals("predicate")) { + return 1; + } else if (s.equals("object")) { + return 2; + } else + throw new IllegalArgumentException("Invalid triple place."); + + } + + private static List<String> getVariablePos(StatementPattern sp) { + + List<String> posList = new ArrayList<String>(); + List<Var> varList = sp.getVarList(); + + for (int i = 0; i < 3; i++) { + if (!varList.get(i).isConstant()) { + posList.add(intToTriplePlace(i)); + + } + } + + return posList; + + } + + private static List<String> getConstantPos(StatementPattern sp) { + + List<String> posList = new ArrayList<String>(); + List<Var> varList = sp.getVarList(); + + for (int i = 0; i < 3; i++) { + if (varList.get(i).isConstant()) { + posList.add(intToTriplePlace(i)); + + } + } + + return posList; + + } + + // assumes sp contains at most two constants + // TODO might not be good if all variable sp is needed to get table size + public static String getRow(StatementPattern sp, boolean joinTable) { + + String row = ""; + String values = ""; + List<Var> varList = sp.getVarList(); + List<String> constList = CardinalityCalcUtil.getConstantPos(sp); + int i; + + for (String s : constList) { + + i = CardinalityCalcUtil.triplePlaceToInt(s); + + if (row.equals("subject") && s.equals("object") && joinTable) { + row = s + row; + if (values.length() == 0) { + values = values + removeQuotes(varList.get(i).getValue().toString()); + } else { + values = removeQuotes(varList.get(i).getValue().toString()) + DELIM + values; + } + } else { + row = row + s; + if (values.length() == 0) { + values = values + removeQuotes(varList.get(i).getValue().toString()); + } else { + values = values + DELIM + removeQuotes(varList.get(i).getValue().toString()); + } + } + + } + + return (row + DELIM + values); + + } + + + + + private static String removeQuotes(String s) { + String trim = s.trim(); + if (trim.substring(0, 1).equals("\"")) { + trim = trim.substring(1, trim.length() - 1); + } + return trim; + } + + + + + + public static long getJCard(Key key) { + + String s = key.getColumnQualifier().toString(); + return Long.parseLong(s); + + } + + //determines a list of the positions in which two SPs have a common variable + private static List<String> getJoinType(StatementPattern sp1, StatementPattern sp2) { + + List<String> joinList = new ArrayList<String>(); + List<Var> spList1 = sp1.getVarList(); + List<Var> spList2 = sp2.getVarList(); + + List<String> pos1 = CardinalityCalcUtil.getVariablePos(sp1); + List<String> pos2 = CardinalityCalcUtil.getVariablePos(sp2); + + int i, j; + + for (String s : pos1) { + for (String t : pos2) { + i = CardinalityCalcUtil.triplePlaceToInt(s); + j = CardinalityCalcUtil.triplePlaceToInt(t); + + if (spList1.get(i).getName().equals(spList2.get(j).getName())) { + joinList.add(s); + joinList.add(t); + + } + + } + } + if (joinList.size() == 4) { + return orderJoinType(joinList); + } + + return joinList; + + } + + // assumes list size is four + private static List<String> orderJoinType(List<String> jList) { + + List<String> tempList = new ArrayList<String>(); + + if (jList.get(0).equals("subject") && jList.get(2).equals("object")) { + tempList.add(jList.get(2)); + tempList.add(jList.get(0)); + tempList.add(jList.get(3)); + tempList.add(jList.get(1)); + return tempList; + } else { + tempList.add(jList.get(0)); + tempList.add(jList.get(2)); + tempList.add(jList.get(1)); + tempList.add(jList.get(3)); + return tempList; + } + + } + + // assumes size is four + private static List<String> reverseJoinType(List<String> jList) { + + List<String> tempList = new ArrayList<String>(); + + if (jList.get(2).equals("subject") && jList.get(3).equals("object")) { + tempList.add(jList.get(3)); + tempList.add(jList.get(2)); + tempList.add(jList.get(1)); + tempList.add(jList.get(0)); + return tempList; + } else if (jList.get(2).equals("predicate") && jList.get(3).equals("subject")) { + tempList.add(jList.get(3)); + tempList.add(jList.get(2)); + tempList.add(jList.get(1)); + tempList.add(jList.get(0)); + return tempList; + } else if (jList.get(2).equals("object") && jList.get(3).equals("predicate")) { + tempList.add(jList.get(3)); + tempList.add(jList.get(2)); + tempList.add(jList.get(1)); + tempList.add(jList.get(0)); + return tempList; + } else { + tempList.add(jList.get(2)); + tempList.add(jList.get(3)); + tempList.add(jList.get(0)); + tempList.add(jList.get(1)); + return tempList; + } + } + + public static List<String> getJoins(StatementPattern sp1, StatementPattern sp2) { + List<String> jList = new ArrayList<String>(); + List<String> list = getJoinType(sp1, sp2); + if (list.size() == 0) { + return list; + } else if (list.size() == 2) { + jList.add(list.get(0) + list.get(1)); + jList.add(list.get(1) + list.get(0)); + return jList; + } else { + + list = orderJoinType(list); + jList.add(list.get(0) + list.get(1) + list.get(2) + list.get(3)); + jList.add(list.get(0) + list.get(1) + list.get(3) + list.get(2)); + list = reverseJoinType(list); + jList.add(list.get(0) + list.get(1) + list.get(2) + list.get(3)); + jList.add(list.get(0) + list.get(1) + list.get(3) + list.get(2)); + return jList; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java new file mode 100644 index 0000000..c84130b --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java @@ -0,0 +1,128 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SELECTIVITY_TABLE; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_TABLE; + +import java.io.IOException; + +import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class FullTableSize extends Configured implements Tool { + + private static final String DELIM = "\u0000"; + + + + + + public static void main(String[] args) throws Exception { + ToolRunner.run(new FullTableSize(), args); + } + + + + + + + public static class FullTableMapper extends Mapper<Key,Value,Text,IntWritable> { + private static final IntWritable ONE = new IntWritable(1); + + + @Override + public void map(Key key, Value value, Context context) throws IOException, InterruptedException { + context.write(new Text("COUNT"), ONE); + } + } + + public static class FullTableReducer extends Reducer<Text,IntWritable,Text,Mutation> { + + @Override + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { + int count = 0; + + for (IntWritable i : values) { + count += i.get(); + } + + String countStr = Integer.toString(count); + + Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m.put(new Text("FullTableCardinality"), new Text(countStr), new Value(new byte[0])); + + context.write(new Text(""), m); + } + } + + public static class FullTableCombiner extends Reducer<Text,IntWritable,Text,IntWritable> { + + @Override + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { + + int count = 0; + + for (IntWritable i : values) { + count += i.get(); + } + + context.write(key, new IntWritable(count)); + } + } + + @Override + public int run(String[] args) throws Exception { + + Configuration conf = getConf(); + String inTable = conf.get(SPO_TABLE); + String outTable = conf.get(SELECTIVITY_TABLE); + String auths = conf.get(AUTHS); + + assert inTable != null && outTable != null; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + JoinSelectStatsUtil.initTableMRJob(job, inTable, outTable, auths); + job.setMapperClass(FullTableMapper.class); + job.setCombinerClass(FullTableCombiner.class); + job.setReducerClass(FullTableReducer.class); + job.setNumReduceTasks(1); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java new file mode 100644 index 0000000..165b18d --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java @@ -0,0 +1,270 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.OUTPUTPATH; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_OUTPUTPATH; + +import java.io.IOException; + +import mvm.rya.joinselect.mr.utils.CardList; +import mvm.rya.joinselect.mr.utils.CardinalityType; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; +import mvm.rya.joinselect.mr.utils.TripleCard; +import mvm.rya.joinselect.mr.utils.TripleEntry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; + +public class JoinSelectAggregate extends Configured implements Tool { + + public static class JoinSelectAggregateMapper extends Mapper<CompositeType,TripleCard,CompositeType,TripleCard> { + + public void map(CompositeType key, TripleCard value, Context context) throws IOException, InterruptedException { + + context.write(key, value); + + } + + } + + public static class JoinReducer extends Reducer<CompositeType,TripleCard,TripleEntry,CardList> { + + public void reduce(CompositeType key, Iterable<TripleCard> values, Context context) throws IOException, InterruptedException { + + CardinalityType card; + TripleEntry triple; + CardinalityType subjectCard = null; + CardinalityType objectCard = null; + CardinalityType predicateCard = null; + CardinalityType spCard = null; + CardinalityType soCard = null; + CardinalityType poCard = null; + CardList cList = new CardList((long) 0, (long) 0, (long) 0, (long) 0, (long) 0, (long) 0); + boolean listEmpty = true; + + // System.out.println("********************************************************************"); + // System.out.println("Key is " + key ); + + for (TripleCard val : values) { + + // System.out.println("Value in iterable is " + val); + if (!val.isCardNull()) { + card = val.getCard(); + + if (card.getCardType().toString().equals("object")) { + if (objectCard == null) { + objectCard = new CardinalityType(); + objectCard.set(card); + + } else if (objectCard.compareTo(card) > 0) { + // System.out.println(objectCard.compareTo(card)); + objectCard.set(card); + + } + + } else if (card.getCardType().toString().equals("predicate")) { + // System.out.println("Coming in here?"); + if (predicateCard == null) { + predicateCard = new CardinalityType(); + predicateCard.set(card); + + } else if (predicateCard.compareTo(card) > 0) { + predicateCard.set(card); + + } + } else if (card.getCardType().toString().equals("subject")) { + if (subjectCard == null) { + subjectCard = new CardinalityType(); + subjectCard.set(card); + + } else if (subjectCard.compareTo(card) > 0) { + subjectCard.set(card); + } + + } else if (card.getCardType().toString().equals("subjectpredicate")) { + if (spCard == null) { + spCard = new CardinalityType(); + spCard.set(card); + + } else if (spCard.compareTo(card) > 0) { + spCard.set(card); + + } + } else if (card.getCardType().toString().equals("subjectobject")) { + if (soCard == null) { + soCard = new CardinalityType(); + soCard.set(card); + + } else if (soCard.compareTo(card) > 0) { + soCard.set(card); + + } + } else if (card.getCardType().toString().equals("predicateobject")) { + if (poCard == null) { + poCard = new CardinalityType(); + poCard.set(card); + + } else if (poCard.compareTo(card) > 0) { + poCard.set(card); + + } + } + + } else { + + if (listEmpty) { + if (subjectCard != null || predicateCard != null || objectCard != null) { + + if (subjectCard != null) { + cList.setSCard(subjectCard.getCard().get()); + } + if (predicateCard != null) { + cList.setPCard(predicateCard.getCard().get()); + } + if (objectCard != null) { + cList.setOCard(objectCard.getCard().get()); + } + + listEmpty = false; + + } else if (spCard != null || poCard != null || soCard != null) { + + if (spCard != null) { + cList.setSPCard(spCard.getCard().get()); + } + if (poCard != null) { + cList.setPOCard(poCard.getCard().get()); + } + if (soCard != null) { + cList.setSOCard(soCard.getCard().get()); + } + + listEmpty = false; + } + + // System.out.println("Cardlist is " + cList); + // System.out.println("Cards are " + + // subjectCard.getCard() + "," + predicateCard.getCard() + // + + // "," + objectCard.getCard() + "," + spCard.getCard() + + // "," + poCard.getCard() + "," + soCard.getCard()); + // + } + + // only write record if cardList contains at least one + // nonzero entry + if (!val.isTeNull() && !listEmpty) { + + triple = (TripleEntry) val.getTE(); + + context.write(triple, cList); + // System.out.println("Triple is " + triple + + // " and cardinality is " + cList); + + } + + } + } + + } + + } + + public static class JoinSelectPartitioner extends Partitioner<CompositeType,TripleCard> { + + @Override + public int getPartition(CompositeType key, TripleCard value, int numPartitions) { + return Math.abs(key.getOldKey().hashCode() * 127) % numPartitions; + } + + } + + public static class JoinSelectGroupComparator extends WritableComparator { + + protected JoinSelectGroupComparator() { + super(CompositeType.class, true); + } + + @SuppressWarnings("rawtypes") + @Override + public int compare(WritableComparable w1, WritableComparable w2) { + CompositeType ct1 = (CompositeType) w1; + CompositeType ct2 = (CompositeType) w2; + return ct1.getOldKey().compareTo(ct2.getOldKey()); + } + + } + + public static class JoinSelectSortComparator extends WritableComparator { + + protected JoinSelectSortComparator() { + super(CompositeType.class, true); + } + + @SuppressWarnings("rawtypes") + @Override + public int compare(WritableComparable w1, WritableComparable w2) { + CompositeType ct1 = (CompositeType) w1; + CompositeType ct2 = (CompositeType) w2; + return ct1.compareTo(ct2); + } + + } + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + String inPath1 = conf.get(PROSPECTS_OUTPUTPATH); + String inPath2 = conf.get(SPO_OUTPUTPATH); + String auths = conf.get(AUTHS); + String outPath = conf.get(OUTPUTPATH); + + assert inPath1 != null && inPath2 != null && outPath != null; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + job.setUserClassesTakesPrecedence(true); + + JoinSelectStatsUtil.initJoinMRJob(job, inPath1, inPath2, JoinSelectAggregateMapper.class, outPath, auths); + + job.setSortComparatorClass(JoinSelectSortComparator.class); + job.setGroupingComparatorClass(JoinSelectGroupComparator.class); + job.setPartitionerClass(JoinSelectPartitioner.class); + job.setReducerClass(JoinReducer.class); + job.setNumReduceTasks(32); + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java new file mode 100644 index 0000000..c97954d --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java @@ -0,0 +1,59 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class JoinSelectDriver extends Configured implements Tool { + + public static void main(String[] args) throws Exception { + ToolRunner.run(new JoinSelectDriver(), args); + } + + @Override + public int run(String[] args) throws Exception { + + Configuration conf = getConf(); + System.out.println("Zookeepers are " + conf.get("zookeepers")); + + int res; + res = ToolRunner.run(conf, new FullTableSize(), args); + + if (res == 0) { + res = ToolRunner.run(conf, new JoinSelectSpoTableOutput(), args); + } + if (res == 0) { + res = ToolRunner.run(conf, new JoinSelectProspectOutput(), args); + } + if (res == 0) { + res = ToolRunner.run(conf, new JoinSelectAggregate(), args); + } + if (res == 0) { + res = ToolRunner.run(conf, new JoinSelectStatisticsSum(), args); + } + + return res; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java new file mode 100644 index 0000000..23838d3 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java @@ -0,0 +1,122 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_TABLE; + +import java.io.IOException; +import java.util.regex.Pattern; + +import mvm.rya.joinselect.mr.utils.CardinalityType; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; +import mvm.rya.joinselect.mr.utils.TripleCard; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; + +public class JoinSelectProspectOutput extends Configured implements Tool { + + public static class CardinalityMapper extends Mapper<Key,Value,CompositeType,TripleCard> { + + private static final String DELIM = "\u0000"; + + Text inText = new Text(); + Pattern splitPattern = Pattern.compile(DELIM); + + public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + + key.getRow(inText); + String[] cardData = splitPattern.split(inText.toString().trim(), 4); + // System.out.println("Card data is " + cardData[0] + ", "+ cardData[1] + ", "+ cardData[2]); + if (cardData.length == 3 && ((cardData[0].equals("subject")) || (cardData[0].equals("object")) || (cardData[0].equals("predicate")))) { + Text tripleValType = new Text(cardData[0]); + Text cardKey = new Text(cardData[1]); + LongWritable ts = new LongWritable(Long.valueOf(cardData[2])); + + String s = new String(data.get()); + LongWritable card = new LongWritable(Long.parseLong(s)); + + CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); + TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); + + context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); + // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); + + } else if (cardData.length == 4 + && ((cardData[0].equals("subjectpredicate")) || (cardData[0].equals("subjectobject")) || (cardData[0].equals("predicateobject")))) { + + Text tripleValType = new Text(cardData[0]); + Text cardKey = new Text(cardData[1] + DELIM + cardData[2]); + LongWritable ts = new LongWritable(Long.valueOf(cardData[3])); + + String s = new String(data.get()); + LongWritable card = new LongWritable(Long.parseLong(s)); + + CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); + TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); + + context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); + // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); + + } + + } + + } + + @Override + public int run(String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException { + + Configuration conf = getConf(); + String inTable = conf.get(PROSPECTS_TABLE); + String auths = conf.get(AUTHS); + String outPath = conf.get(PROSPECTS_OUTPUTPATH); + + assert inTable != null && outPath != null; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + job.setUserClassesTakesPrecedence(true); + + JoinSelectStatsUtil.initTabToSeqFileJob(job, inTable, outPath, auths); + job.setMapperClass(CardinalityMapper.class); + + job.setNumReduceTasks(0); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectSpoTableOutput.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectSpoTableOutput.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectSpoTableOutput.java new file mode 100644 index 0000000..f968572 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectSpoTableOutput.java @@ -0,0 +1,124 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_OUTPUTPATH; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_TABLE; + +import java.io.IOException; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.joinselect.mr.utils.CompositeType; +import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; +import mvm.rya.joinselect.mr.utils.TripleCard; +import mvm.rya.joinselect.mr.utils.TripleEntry; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.Tool; + +public class JoinSelectSpoTableOutput extends Configured implements Tool { + + public static class JoinSelectMapper extends Mapper<Key,Value,CompositeType,TripleCard> { + + private RyaTripleContext ryaContext; + private static final String DELIM = "\u0000"; + + public void map(Key row, Value data, Context context) throws IOException, InterruptedException { + try { + ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(context.getConfiguration())); + RyaStatement ryaStatement = ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, new TripleRow(row.getRow().getBytes(), row + .getColumnFamily().getBytes(), row.getColumnQualifier().getBytes(), row.getTimestamp(), row.getColumnVisibility().getBytes(), data.get())); + + Text s = new Text(ryaStatement.getSubject().getData()); + Text p = new Text(ryaStatement.getPredicate().getData()); + Text o = new Text(ryaStatement.getObject().getData()); + Text sp = new Text(ryaStatement.getSubject().getData() + DELIM + ryaStatement.getPredicate().getData()); + Text po = new Text(ryaStatement.getPredicate().getData() + DELIM + ryaStatement.getObject().getData()); + Text so = new Text(ryaStatement.getSubject().getData() + DELIM + ryaStatement.getObject().getData()); + Text ps = new Text(ryaStatement.getPredicate().getData() + DELIM + ryaStatement.getSubject().getData()); + Text op = new Text(ryaStatement.getObject().getData() + DELIM + ryaStatement.getPredicate().getData()); + Text os = new Text(ryaStatement.getObject().getData() + DELIM + ryaStatement.getSubject().getData()); + + TripleEntry t1 = new TripleEntry(s, p, new Text("subject"), new Text("predicate"), new Text("object")); + TripleEntry t2 = new TripleEntry(p, o, new Text("predicate"), new Text("object"), new Text("subject")); + TripleEntry t3 = new TripleEntry(o, s, new Text("object"), new Text("subject"), new Text("predicate")); + TripleEntry t4 = new TripleEntry(s, new Text(""), new Text("subject"), new Text(""), new Text("predicateobject")); + TripleEntry t5 = new TripleEntry(p, new Text(""), new Text("predicate"), new Text(""), new Text("objectsubject")); + TripleEntry t6 = new TripleEntry(o, new Text(""), new Text("object"), new Text(""), new Text("subjectpredicate")); + TripleEntry t7 = new TripleEntry(s, new Text(""), new Text("subject"), new Text(""), new Text("objectpredicate")); + TripleEntry t8 = new TripleEntry(p, new Text(""), new Text("predicate"), new Text(""), new Text("subjectobject")); + TripleEntry t9 = new TripleEntry(o, new Text(""), new Text("object"), new Text(""), new Text("predicatesubject")); + + context.write(new CompositeType(o, new IntWritable(2)), new TripleCard(t1)); + context.write(new CompositeType(s, new IntWritable(2)), new TripleCard(t2)); + context.write(new CompositeType(p, new IntWritable(2)), new TripleCard(t3)); + context.write(new CompositeType(po, new IntWritable(2)), new TripleCard(t4)); + context.write(new CompositeType(so, new IntWritable(2)), new TripleCard(t5)); + context.write(new CompositeType(sp, new IntWritable(2)), new TripleCard(t6)); + context.write(new CompositeType(op, new IntWritable(2)), new TripleCard(t7)); + context.write(new CompositeType(os, new IntWritable(2)), new TripleCard(t8)); + context.write(new CompositeType(ps, new IntWritable(2)), new TripleCard(t9)); + + } catch (TripleRowResolverException e) { + e.printStackTrace(); + } + + } + + } + + @Override + public int run(String[] args) throws Exception { + + Configuration conf = getConf(); + String inTable = conf.get(SPO_TABLE); + String auths = conf.get(AUTHS); + String outPath = conf.get(SPO_OUTPUTPATH); + + assert inTable != null && outPath != null; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + job.setUserClassesTakesPrecedence(true); + + JoinSelectStatsUtil.initTabToSeqFileJob(job, inTable, outPath, auths); + job.setMapperClass(JoinSelectMapper.class); + job.setNumReduceTasks(0); + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectStatisticsSum.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectStatisticsSum.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectStatisticsSum.java new file mode 100644 index 0000000..c972b83 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectStatisticsSum.java @@ -0,0 +1,219 @@ +package mvm.rya.joinselect.mr; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.INPUTPATH; +import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SELECTIVITY_TABLE; + +import java.io.IOException; + +import mvm.rya.joinselect.mr.utils.CardList; +import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; +import mvm.rya.joinselect.mr.utils.TripleEntry; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; + +public class JoinSelectStatisticsSum extends Configured implements Tool { + + // TODO need to tweak this class to compute join cardinalities over more than one variable + + public static class CardinalityIdentityMapper extends Mapper<TripleEntry,CardList,TripleEntry,CardList> { + + public void map(TripleEntry key, CardList value, Context context) throws IOException, InterruptedException { + + // System.out.println("Keys are " + key + " and values are " + value); + + if (key.getSecond().toString().length() != 0 && key.getSecondPos().toString().length() != 0) { + TripleEntry te1 = new TripleEntry(key.getFirst(), new Text(""), key.getFirstPos(), new Text(""), key.getKeyPos()); + TripleEntry te2 = new TripleEntry(key.getSecond(), new Text(""), key.getSecondPos(), new Text(""), key.getKeyPos()); + + context.write(te1, value); + context.write(te2, value); + context.write(key, value); + // System.out.println("Output key values from mapper are " + te1 + " and " + value + "\n" + // + te2 + " and " + value + "\n" + key + " and " + value + "\n"); + } else if (key.getSecond().toString().length() == 0 && key.getSecondPos().toString().length() == 0) { + + context.write(key, value); + // System.out.println("Output key values from mapper are " + "\n" + key + " and " + value + "\n" + "\n"); + } + + } + + } + + public static class CardinalityIdentityReducer extends Reducer<TripleEntry,CardList,Text,Mutation> { + + private static final String DELIM = "\u0000"; + + public void reduce(TripleEntry te, Iterable<CardList> values, Context context) throws IOException, InterruptedException { + + CardList cl = new CardList(); + LongWritable s = new LongWritable(0); + LongWritable p = new LongWritable(0); + LongWritable o = new LongWritable(0); + LongWritable sp = new LongWritable(0); + LongWritable po = new LongWritable(0); + LongWritable so = new LongWritable(0); + + // System.out.println("***********************************************************\n" + // + "key is " + te); + + for (CardList val : values) { + // System.out.println("Value is " + val); + s.set(s.get() + val.getcardS().get()); + p.set(p.get() + val.getcardP().get()); + o.set(o.get() + val.getcardO().get()); + sp.set(sp.get() + val.getcardSP().get()); + po.set(po.get() + val.getcardPO().get()); + so.set(so.get() + val.getcardSO().get()); + } + cl.setCard(s, p, o, sp, po, so); + + Text row; + + if (te.getSecond().toString().length() > 0) { + row = new Text(te.getFirstPos().toString() + te.getSecondPos().toString() + DELIM + te.getFirst().toString() + DELIM + te.getSecond()); + } else { + row = new Text(te.getFirstPos().toString() + DELIM + te.getFirst().toString()); + } + + Mutation m1, m2, m3; + + if (te.getKeyPos().toString().equals("subject") || te.getKeyPos().toString().equals("predicate") || te.getKeyPos().toString().equals("object")) { + m1 = new Mutation(row); + m1.put(new Text(te.getKeyPos().toString() + "subject"), new Text(cl.getcardS().toString()), new Value(new byte[0])); + m2 = new Mutation(row); + m2.put(new Text(te.getKeyPos().toString() + "predicate"), new Text(cl.getcardP().toString()), new Value(new byte[0])); + m3 = new Mutation(row); + m3.put(new Text(te.getKeyPos().toString() + "object"), new Text(cl.getcardO().toString()), new Value(new byte[0])); + + } else if (te.getKeyPos().toString().equals("predicatesubject") || te.getKeyPos().toString().equals("objectpredicate") + || te.getKeyPos().toString().equals("subjectobject")) { + + String jOrder = reverseJoinOrder(te.getKeyPos().toString()); + + m1 = new Mutation(row); + m1.put(new Text(jOrder + "predicatesubject"), new Text(cl.getcardSP().toString()), new Value(new byte[0])); + m2 = new Mutation(row); + m2.put(new Text(jOrder + "objectpredicate"), new Text(cl.getcardPO().toString()), new Value(new byte[0])); + m3 = new Mutation(row); + m3.put(new Text(jOrder + "subjectobject"), new Text(cl.getcardSO().toString()), new Value(new byte[0])); + + } else { + + m1 = new Mutation(row); + m1.put(new Text(te.getKeyPos().toString() + "subjectpredicate"), new Text(cl.getcardSP().toString()), new Value(new byte[0])); + m2 = new Mutation(row); + m2.put(new Text(te.getKeyPos().toString() + "predicateobject"), new Text(cl.getcardPO().toString()), new Value(new byte[0])); + m3 = new Mutation(row); + m3.put(new Text(te.getKeyPos().toString() + "objectsubject"), new Text(cl.getcardSO().toString()), new Value(new byte[0])); + + } + + // TODO add the appropriate table name here + context.write(new Text(""), m1); + context.write(new Text(""), m2); + context.write(new Text(""), m3); + } + + private String reverseJoinOrder(String s) { + + if (s.equals("predicatesubject")) { + return "subjectpredicate"; + } else if (s.equals("objectpredicate")) { + return "predicateobject"; + } else if (s.equals("subjectobject")) { + return "objectsubject"; + } else { + throw new IllegalArgumentException("Invalid join type."); + } + + } + + } + + public static class CardinalityIdentityCombiner extends Reducer<TripleEntry,CardList,TripleEntry,CardList> { + + @Override + public void reduce(TripleEntry key, Iterable<CardList> values, Context context) throws IOException, InterruptedException { + + CardList cl = new CardList(); + LongWritable s = new LongWritable(0); + LongWritable p = new LongWritable(0); + LongWritable o = new LongWritable(0); + LongWritable sp = new LongWritable(0); + LongWritable po = new LongWritable(0); + LongWritable so = new LongWritable(0); + + for (CardList val : values) { + s.set(s.get() + val.getcardS().get()); + p.set(p.get() + val.getcardP().get()); + o.set(o.get() + val.getcardO().get()); + sp.set(sp.get() + val.getcardSP().get()); + po.set(po.get() + val.getcardPO().get()); + so.set(so.get() + val.getcardSO().get()); + } + + cl.setCard(s, p, o, sp, po, so); + context.write(key, cl); + + } + + } + + @Override + public int run(String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException { + + Configuration conf = getConf(); + String outTable = conf.get(SELECTIVITY_TABLE); + String auths = conf.get(AUTHS); + String inPath = conf.get(INPUTPATH); + + assert outTable != null && inPath != null; + + Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + job.setJarByClass(this.getClass()); + JoinSelectStatsUtil.initSumMRJob(job, inPath, outTable, auths); + + job.setMapperClass(CardinalityIdentityMapper.class); + job.setCombinerClass(CardinalityIdentityCombiner.class); + job.setReducerClass(CardinalityIdentityReducer.class); + job.setNumReduceTasks(32); + + job.waitForCompletion(true); + + return job.isSuccessful() ? 0 : 1; + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardList.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardList.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardList.java new file mode 100644 index 0000000..a0aa967 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardList.java @@ -0,0 +1,208 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +public class CardList implements WritableComparable<CardList> { + + private LongWritable cardS; + private LongWritable cardP; + private LongWritable cardO; + private LongWritable cardSP; + private LongWritable cardPO; + private LongWritable cardSO; + + public CardList() { + cardS = new LongWritable(); + cardP = new LongWritable(); + cardO = new LongWritable(); + cardSP = new LongWritable(); + cardSO = new LongWritable(); + cardPO = new LongWritable(); + + } + + public CardList(long cardS, long cardP, long cardO, long cardSP, long cardPO, long cardSO) { + this.cardS = new LongWritable(cardS); + this.cardP = new LongWritable(cardP); + this.cardO = new LongWritable(cardO); + this.cardSP = new LongWritable(cardSP); + this.cardSO = new LongWritable(cardSO); + this.cardPO = new LongWritable(cardPO); + } + + public CardList(LongWritable cardS, LongWritable cardP, LongWritable cardO, LongWritable cardSP, LongWritable cardPO, LongWritable cardSO) { + + this.cardS = cardS; + this.cardP = cardP; + this.cardO = cardO; + this.cardSP = cardSP; + this.cardPO = cardPO; + this.cardSO = cardSO; + + } + + public void setCard(LongWritable cardS, LongWritable cardP, LongWritable cardO, LongWritable cardSP, LongWritable cardPO, LongWritable cardSO) { + this.cardS = cardS; + this.cardP = cardP; + this.cardO = cardO; + this.cardSP = cardSP; + this.cardPO = cardPO; + this.cardSO = cardSO; + + } + + public void setSCard(long cardS) { + this.cardS = new LongWritable(cardS); + } + + public void setPCard(long cardP) { + this.cardP = new LongWritable(cardP); + } + + public void setOCard(long cardO) { + this.cardO = new LongWritable(cardO); + } + + public void setSPCard(long cardSP) { + this.cardSP = new LongWritable(cardSP); + } + + public void setSOCard(long cardSO) { + this.cardSO = new LongWritable(cardSO); + } + + public void setPOCard(long cardPO) { + this.cardPO = new LongWritable(cardPO); + } + + public LongWritable getcardS() { + return this.cardS; + } + + public LongWritable getcardP() { + return this.cardP; + } + + public LongWritable getcardO() { + return this.cardO; + } + + public LongWritable getcardPO() { + return this.cardPO; + } + + public LongWritable getcardSO() { + return this.cardSO; + } + + public LongWritable getcardSP() { + return this.cardSP; + } + + @Override + public void write(DataOutput out) throws IOException { + cardS.write(out); + cardP.write(out); + cardO.write(out); + cardSO.write(out); + cardPO.write(out); + cardSP.write(out); + + } + + @Override + public void readFields(DataInput in) throws IOException { + cardS.readFields(in); + cardP.readFields(in); + cardO.readFields(in); + cardSO.readFields(in); + cardPO.readFields(in); + cardSP.readFields(in); + + } + + @Override + public int hashCode() { + int result = 7; + result = result * 17 + cardS.hashCode(); + result = result * 17 + cardP.hashCode(); + result = result * 17 + cardO.hashCode(); + result = result * 17 + cardSP.hashCode(); + result = result * 17 + cardPO.hashCode(); + result = result * 17 + cardSO.hashCode(); + + return result; + + } + + @Override + public boolean equals(Object o) { + if (o instanceof CardList) { + CardList comp = (CardList) o; + return cardS.equals(comp.cardS) && cardP.equals(comp.cardP) && cardO.equals(comp.cardO) && cardSP.equals(comp.cardSP) && cardSO.equals(comp.cardSO) + && cardPO.equals(comp.cardPO); + + } + return false; + } + + @Override + public String toString() { + return cardS + "\t" + cardP + "\t" + cardO + "\t" + cardSP + "\t" + cardPO + "\t" + cardSO; + + } + + @Override + public int compareTo(CardList o) { + + int cmp = cardS.compareTo(o.cardS); + if (cmp != 0) { + return cmp; + } + cmp = cardP.compareTo(o.cardP); + if (cmp != 0) { + return cmp; + } + cmp = cardO.compareTo(o.cardO); + if (cmp != 0) { + return cmp; + } + cmp = cardSP.compareTo(o.cardSP); + if (cmp != 0) { + return cmp; + } + cmp = cardPO.compareTo(o.cardPO); + if (cmp != 0) { + return cmp; + } + + return cardSO.compareTo(o.cardSO); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardinalityType.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardinalityType.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardinalityType.java new file mode 100644 index 0000000..fd62b52 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CardinalityType.java @@ -0,0 +1,148 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +public class CardinalityType implements WritableComparable<CardinalityType> { + + private LongWritable card; + private Text cardType; + private LongWritable ts; + + public CardinalityType() { + card = new LongWritable(); + cardType = new Text(); + ts = new LongWritable(); + } + + public CardinalityType(int card, String cardType, long ts) { + + this.card = new LongWritable(card); + this.cardType = new Text(cardType); + this.ts = new LongWritable(ts); + + } + + public CardinalityType(LongWritable card, Text cardType, LongWritable ts) { + + this.card = card; + this.ts = ts; + this.cardType = cardType; + + } + + public void set(CardinalityType ct) { + this.card.set(ct.card.get()); + this.ts.set(ct.ts.get()); + this.cardType.set(ct.cardType); + } + + public void setCard(LongWritable card) { + this.card = card; + + } + + public void setCardType(Text cardType) { + this.cardType = cardType; + } + + public void setTS(LongWritable ts) { + this.ts = ts; + } + + public LongWritable getCard() { + return this.card; + } + + public Text getCardType() { + return this.cardType; + } + + public LongWritable getTS() { + return this.ts; + } + + @Override + public void write(DataOutput out) throws IOException { + card.write(out); + cardType.write(out); + ts.write(out); + + } + + @Override + public void readFields(DataInput in) throws IOException { + card.readFields(in); + cardType.readFields(in); + ts.readFields(in); + + } + + @Override + public int hashCode() { + int result = 7; + result = result * 17 + card.hashCode(); + result = result * 17 + cardType.hashCode(); + result = result * 17 + ts.hashCode(); + + return result; + + } + + @Override + public boolean equals(Object o) { + if (o instanceof CardinalityType) { + CardinalityType trip = (CardinalityType) o; + return card.equals(trip.card) && cardType.equals(trip.cardType) && ts.equals(trip.ts); + + } + return false; + } + + @Override + public String toString() { + return card + " " + cardType + " " + ts; + + } + + @Override + public int compareTo(CardinalityType o) { + + int cmp = cardType.compareTo(o.cardType); + if (cmp != 0) { + return cmp; + } + cmp = ts.compareTo(o.ts); + if (cmp != 0) { + return cmp; + } + return card.compareTo(o.card); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CompositeType.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CompositeType.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CompositeType.java new file mode 100644 index 0000000..8a40e6a --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/CompositeType.java @@ -0,0 +1,121 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +public class CompositeType implements WritableComparable<CompositeType> { + + private Text oldKey; + private IntWritable priority; + + public CompositeType() { + oldKey = new Text(); + priority = new IntWritable(); + } + + public CompositeType(String oldKey, int priority) { + this.oldKey = new Text(oldKey); + this.priority = new IntWritable(priority); + } + + public CompositeType(Text oldKey, IntWritable priority) { + + this.oldKey = oldKey; + this.priority = priority; + + } + + public void setOldKey(Text oldKey) { + this.oldKey = oldKey; + + } + + public void setPriority(IntWritable priority) { + this.priority = priority; + } + + public Text getOldKey() { + return this.oldKey; + } + + public IntWritable getPriority() { + return this.priority; + } + + @Override + public void write(DataOutput out) throws IOException { + oldKey.write(out); + priority.write(out); + + } + + @Override + public void readFields(DataInput in) throws IOException { + oldKey.readFields(in); + priority.readFields(in); + + } + + @Override + public int hashCode() { + int result = 7; + result = result * 17 + oldKey.hashCode(); + // result = result*17+ priority.hashCode(); + + return result; + + } + + @Override + public boolean equals(Object o) { + if (o instanceof CompositeType) { + CompositeType comp = (CompositeType) o; + return oldKey.equals(comp.oldKey) && priority.equals(comp.priority); + + } + return false; + } + + @Override + public String toString() { + return oldKey + "\t" + priority; + + } + + @Override + public int compareTo(CompositeType o) { + int compare = getOldKey().compareTo(o.getOldKey()); + if (compare != 0) { + return compare; + } + + return getPriority().compareTo(o.getPriority()); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectConstants.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectConstants.java new file mode 100644 index 0000000..6eac9e3 --- /dev/null +++ b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/utils/JoinSelectConstants.java @@ -0,0 +1,45 @@ +package mvm.rya.joinselect.mr.utils; + +/* + * #%L + * mvm.rya.rya.prospector + * %% + * Copyright (C) 2014 - 2015 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public class JoinSelectConstants { + + public static final String COUNT = "count"; + public static final String METADATA = "metadata"; + public static final byte[] EMPTY = new byte[0]; + + // config properties + public static final String PERFORMANT = "performant"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String INSTANCE = "instance"; + public static final String ZOOKEEPERS = "zookeepers"; + public static final String INPUTPATH = "inputpath"; + public static final String OUTPUTPATH = "outputpath"; + public static final String PROSPECTS_OUTPUTPATH = "prospects.outputpath"; + public static final String SPO_OUTPUTPATH = "spo.outputpath"; + public static final String AUTHS = "auths"; + public static final String PROSPECTS_TABLE = "prospects.table"; + public static final String SPO_TABLE = "spo.table"; + public static final String SELECTIVITY_TABLE = "selectivity.table"; + public static final String MOCK = "mock"; + +}
