http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/CountPlan.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/CountPlan.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/CountPlan.groovy deleted file mode 100644 index 091c295..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/CountPlan.groovy +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.plans.impl - -import mvm.rya.api.domain.RyaStatement -import mvm.rya.prospector.domain.IndexEntry -import mvm.rya.prospector.domain.IntermediateProspect -import mvm.rya.prospector.domain.TripleValueType -import mvm.rya.prospector.plans.IndexWorkPlan -import mvm.rya.prospector.utils.CustomEntry -import mvm.rya.prospector.utils.ProspectorUtils - -import org.apache.accumulo.core.data.Mutation -import org.apache.accumulo.core.data.Range -import org.apache.accumulo.core.data.Value -import org.apache.accumulo.core.security.Authorizations -import org.apache.accumulo.core.security.ColumnVisibility -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapreduce.Reducer -import org.openrdf.model.util.URIUtil -import org.openrdf.model.vocabulary.XMLSchema; - -import static mvm.rya.prospector.utils.ProspectorConstants.COUNT; -import mvm.rya.api.RdfCloudTripleStoreConstants - -/** - * Date: 12/3/12 - * Time: 12:28 PM - */ -class CountPlan implements IndexWorkPlan { - - @Override - Collection<Map.Entry<IntermediateProspect, LongWritable>> map(RyaStatement ryaStatement) { - def subject = ryaStatement.getSubject() - def predicate = ryaStatement.getPredicate() - def subjpred = ryaStatement.getSubject().data + DELIM + ryaStatement.getPredicate().data - def predobj = ryaStatement.getPredicate().data + DELIM + ryaStatement.getObject().data - def subjobj = ryaStatement.getSubject().data + DELIM + ryaStatement.getObject().data - def object = ryaStatement.getObject() - def localIndex = URIUtil.getLocalNameIndex(subject.data) - def namespace = subject.data.substring(0, localIndex - 1) - def visibility = new String(ryaStatement.columnVisibility) - return [ - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: subject.data, - dataType: URITYPE, - tripleValueType: TripleValueType.subject, - visibility: visibility), - ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: predicate.data, - dataType: URITYPE, - tripleValueType: TripleValueType.predicate, - visibility: visibility - ), ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: object.data, - dataType: object.dataType.stringValue(), - tripleValueType: TripleValueType.object, - visibility: visibility - ), ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: subjpred, - dataType: XMLSchema.STRING, - tripleValueType: TripleValueType.subjectpredicate, - visibility: visibility - ), ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: subjobj, - dataType: XMLSchema.STRING, - tripleValueType: TripleValueType.subjectobject, - visibility: visibility - ), ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: predobj, - dataType: XMLSchema.STRING, - tripleValueType: TripleValueType.predicateobject, - visibility: visibility - ), ONE), - new CustomEntry<IntermediateProspect, LongWritable>( - new IntermediateProspect(index: COUNT, - data: namespace, - dataType: URITYPE, - tripleValueType: TripleValueType.entity, - visibility: visibility - ), ONE), - ] - } - - @Override - Collection<Map.Entry<IntermediateProspect, LongWritable>> combine(IntermediateProspect prospect, Iterable<LongWritable> counts) { - - def iter = counts.iterator() - long sum = 0; - iter.each { lw -> - sum += lw.get() - } - - return [new CustomEntry<IntermediateProspect, LongWritable>(prospect, new LongWritable(sum))] - } - - @Override - void reduce(IntermediateProspect prospect, Iterable<LongWritable> counts, Date timestamp, Reducer.Context context) { - def iter = counts.iterator() - long sum = 0; - iter.each { lw -> - sum += lw.get() - } - - def indexType = prospect.tripleValueType.name() - - // not sure if this is the best idea.. - if ((sum >= 0) || - indexType.equals(TripleValueType.predicate.toString())) { - - Mutation m = new Mutation(indexType + DELIM + prospect.data + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp)) - m.put(COUNT, prospect.dataType, new ColumnVisibility(prospect.visibility), timestamp.getTime(), new Value("${sum}".getBytes())); - - context.write(null, m); - } - } - - @Override - String getIndexType() { - return COUNT - } - - @Override - String getCompositeValue(List<String> indices){ - Iterator<String> indexIt = indices.iterator(); - String compositeIndex = indexIt.next(); - while (indexIt.hasNext()){ - String value = indexIt.next(); - compositeIndex += DELIM + value; - } - return compositeIndex; - } - - @Override - List<IndexEntry> query(def connector, String tableName, List<Long> prospectTimes, String type, String compositeIndex, String dataType, String[] auths) { - - assert connector != null && tableName != null && type != null && compositeIndex != null - - def bs = connector.createBatchScanner(tableName, new Authorizations(auths), 4) - def ranges = [] - int max = 1000; //by default only return 1000 prospects maximum - if (prospectTimes != null) { - prospectTimes.each { prospect -> - ranges.add( - new Range(type + DELIM + compositeIndex + DELIM + ProspectorUtils.getReverseIndexDateTime(new Date(prospect)))) - } - } else { - max = 1; //only return the latest if no prospectTimes given - def prefix = type + DELIM + compositeIndex + DELIM; - ranges.add(new Range(prefix, prefix + RdfCloudTripleStoreConstants.LAST)) - } - bs.ranges = ranges - if (dataType != null) { - bs.fetchColumn(new Text(COUNT), new Text(dataType)) - } else { - bs.fetchColumnFamily(new Text(COUNT)) - } - - List<IndexEntry> indexEntries = new ArrayList<IndexEntry>() - def iter = bs.iterator() - - while (iter.hasNext() && indexEntries.size() <= max) { - def entry = iter.next() - def k = entry.key - def v = entry.value - - def rowArr = k.row.toString().split(DELIM) - String values = ""; - // if it is a composite index, then return the type as a composite index - if (type.equalsIgnoreCase(TripleValueType.subjectpredicate.toString()) || - type.equalsIgnoreCase(TripleValueType.subjectobject.toString()) || - type.equalsIgnoreCase(TripleValueType.predicateobject.toString())){ - values =rowArr[1] + DELIM + rowArr[2] - } - else values = rowArr[1] - - indexEntries.add(new IndexEntry(data: values, - tripleValueType: rowArr[0], - index: COUNT, - dataType: k.columnQualifier.toString(), - visibility: k.columnVisibility.toString(), - count: Long.parseLong(new String(v.get())), - timestamp: k.timestamp - )) - } - bs.close() - - return indexEntries - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/ServicesBackedIndexWorkPlanManager.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/ServicesBackedIndexWorkPlanManager.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/ServicesBackedIndexWorkPlanManager.groovy deleted file mode 100644 index 6f3f7a6..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/plans/impl/ServicesBackedIndexWorkPlanManager.groovy +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.plans.impl - -import mvm.rya.prospector.plans.IndexWorkPlan -import com.google.common.collect.Lists -import mvm.rya.prospector.plans.IndexWorkPlanManager - -/** - * Date: 12/3/12 - * Time: 11:24 AM - */ -class ServicesBackedIndexWorkPlanManager implements IndexWorkPlanManager { - - def Collection<IndexWorkPlan> plans - - ServicesBackedIndexWorkPlanManager() { - def iterator = ServiceLoader.load(IndexWorkPlan.class).iterator(); - plans = Lists.newArrayList(iterator) - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorService.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorService.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorService.groovy deleted file mode 100644 index bb8ceb4..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorService.groovy +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.service - -import mvm.rya.prospector.utils.ProspectorUtils -import org.apache.accumulo.core.data.Key -import org.apache.accumulo.core.data.Range -import org.apache.accumulo.core.security.Authorizations -import org.apache.hadoop.io.Text - -import static mvm.rya.prospector.utils.ProspectorConstants.METADATA -import static mvm.rya.prospector.utils.ProspectorConstants.PROSPECT_TIME -import mvm.rya.prospector.plans.IndexWorkPlanManager -import mvm.rya.prospector.plans.impl.ServicesBackedIndexWorkPlanManager -import mvm.rya.prospector.plans.IndexWorkPlan -import mvm.rya.prospector.domain.IndexEntry - -/** - * Date: 12/5/12 - * Time: 12:28 PM - */ -class ProspectorService { - - def connector - String tableName - - IndexWorkPlanManager manager = new ServicesBackedIndexWorkPlanManager() - Map<String, IndexWorkPlan> plans - - ProspectorService(def connector, String tableName) { - this.connector = connector - this.tableName = tableName - this.plans = ProspectorUtils.planMap(manager.plans) - - //init - def tos = connector.tableOperations() - if(!tos.exists(tableName)) { - tos.create(tableName) - } - } - - public Iterator<Long> getProspects(String[] auths) { - - def scanner = connector.createScanner(tableName, new Authorizations(auths)) - scanner.setRange(Range.exact(METADATA)); - scanner.fetchColumnFamily(new Text(PROSPECT_TIME)); - - def iterator = scanner.iterator(); - - return new Iterator<Long>() { - - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Long next() { - return iterator.next().getKey().getTimestamp(); - } - - @Override - public void remove() { - iterator.remove(); - } - }; - - } - - public Iterator<Long> getProspectsInRange(long beginTime, long endTime, String[] auths) { - - def scanner = connector.createScanner(tableName, new Authorizations(auths)) - scanner.setRange(new Range( - new Key(METADATA, PROSPECT_TIME, ProspectorUtils.getReverseIndexDateTime(new Date(endTime)), "", Long.MAX_VALUE), - new Key(METADATA, PROSPECT_TIME, ProspectorUtils.getReverseIndexDateTime(new Date(beginTime)), "", 0l) - )) - def iterator = scanner.iterator(); - - return new Iterator<Long>() { - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Long next() { - return iterator.next().getKey().getTimestamp(); - } - - @Override - public void remove() { - iterator.remove(); - } - }; - - } - - public List<IndexEntry> query(List<Long> prospectTimes, String indexType, String type, List<String> index, String dataType, String[] auths) { - assert indexType != null - - def plan = plans.get(indexType) - assert plan != null: "Index Type: ${indexType} does not exist" - String compositeIndex = plan.getCompositeValue(index); - - return plan.query(connector, tableName, prospectTimes, type, compositeIndex, dataType, auths) - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAO.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAO.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAO.groovy deleted file mode 100644 index 3e8aba1..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/service/ProspectorServiceEvalStatsDAO.groovy +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.service - -import mvm.rya.api.RdfCloudTripleStoreConfiguration -import mvm.rya.api.persist.RdfEvalStatsDAO -import mvm.rya.prospector.domain.TripleValueType -import mvm.rya.prospector.utils.ProspectorConstants -import org.apache.hadoop.conf.Configuration -import org.openrdf.model.Resource -import org.openrdf.model.Value - -import mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF - -/** - * An ${@link mvm.rya.api.persist.RdfEvalStatsDAO} that uses the Prospector Service underneath return counts. - */ -class ProspectorServiceEvalStatsDAO implements RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> { - - def ProspectorService prospectorService - - ProspectorServiceEvalStatsDAO() { - } - - ProspectorServiceEvalStatsDAO(ProspectorService prospectorService, RdfCloudTripleStoreConfiguration conf) { - this.prospectorService = prospectorService - } - - public ProspectorServiceEvalStatsDAO(def connector, RdfCloudTripleStoreConfiguration conf) { - this.prospectorService = new ProspectorService(connector, getProspectTableName(conf)) - } - - @Override - void init() { - assert prospectorService != null - } - - @Override - boolean isInitialized() { - return prospectorService != null - } - - @Override - void destroy() { - - } - - @Override - public double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<Value> val) { - - assert conf != null && card != null && val != null - String triplePart = null; - switch (card) { - case (CARDINALITY_OF.SUBJECT): - triplePart = TripleValueType.subject - break; - case (CARDINALITY_OF.PREDICATE): - triplePart = TripleValueType.predicate - break; - case (CARDINALITY_OF.OBJECT): - triplePart = TripleValueType.object - break; - case (CARDINALITY_OF.SUBJECTPREDICATE): - triplePart = TripleValueType.subjectpredicate - break; - case (CARDINALITY_OF.SUBJECTOBJECT): - triplePart = TripleValueType.subjectobject - break; - case (CARDINALITY_OF.PREDICATEOBJECT): - triplePart = TripleValueType.predicateobject - break; - } - - String[] auths = conf.getAuths() - List<String> indexedValues = new ArrayList<String>(); - Iterator<Value> valueIt = val.iterator(); - while (valueIt.hasNext()){ - indexedValues.add(valueIt.next().stringValue()); - } - - def indexEntries = prospectorService.query(null, ProspectorConstants.COUNT, triplePart, indexedValues, null /** what is the datatype here? */, - auths) - - return indexEntries.size() > 0 ? indexEntries.head().count : -1 - } - - @Override - double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<Value> val, Resource context) { - return getCardinality(conf, card, val) //TODO: Not sure about the context yet - } - - @Override - public void setConf(RdfCloudTripleStoreConfiguration conf) { - - } - - @Override - RdfCloudTripleStoreConfiguration getConf() { - return null - } - - public static String getProspectTableName(RdfCloudTripleStoreConfiguration conf) { - return conf.getTablePrefix() + "prospects"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/CustomEntry.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/CustomEntry.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/CustomEntry.groovy deleted file mode 100644 index c550b92..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/CustomEntry.groovy +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.utils - -/** - * Date: 12/3/12 - * Time: 12:33 PM - */ -class CustomEntry<K, V> implements Map.Entry<K, V> { - - K key; - V value; - - CustomEntry(K key, V value) { - this.key = key - this.value = value - } - - K getKey() { - return key - } - - void setKey(K key) { - this.key = key - } - - V getValue() { - return value - } - - V setValue(V value) { - this.value = value - this.value - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorConstants.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorConstants.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorConstants.groovy deleted file mode 100644 index 197e735..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorConstants.groovy +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.utils - -/** - * Date: 12/5/12 - * Time: 10:57 AM - */ -class ProspectorConstants { - public static final String COUNT = "count" - public static final String METADATA = "metadata" - public static final String PROSPECT_TIME = "prospectTime" - public static final String DEFAULT_VIS = "U&FOUO" - public static final byte[] EMPTY = new byte [0]; - - //config properties - public static final String PERFORMANT = "performant" - - public static final String USERNAME = "username" - public static final String PASSWORD = "password" - public static final String INSTANCE = "instance" - public static final String ZOOKEEPERS = "zookeepers" - public static final String MOCK = "mock" -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorUtils.groovy ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorUtils.groovy b/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorUtils.groovy deleted file mode 100644 index 640f17e..0000000 --- a/extras/rya.prospector/src/main/groovy/mvm/rya/prospector/utils/ProspectorUtils.groovy +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package mvm.rya.prospector.utils - -import org.apache.accumulo.core.client.Connector -import org.apache.accumulo.core.client.Instance -import org.apache.accumulo.core.client.ZooKeeperInstance -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat -import org.apache.accumulo.core.client.mock.MockInstance -import org.apache.accumulo.core.data.Mutation -import org.apache.accumulo.core.security.Authorizations -import org.apache.commons.lang.Validate -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.hadoop.mapreduce.Job - -import java.text.SimpleDateFormat -import mvm.rya.prospector.plans.IndexWorkPlan -import org.apache.accumulo.core.client.security.tokens.PasswordToken - -import static mvm.rya.prospector.utils.ProspectorConstants.* - -/** - * Date: 12/4/12 - * Time: 4:24 PM - */ -class ProspectorUtils { - - public static final long INDEXED_DATE_SORT_VAL = 999999999999999999L; // 18 char long, same length as date format pattern below - public static final String INDEXED_DATE_FORMAT = "yyyyMMddHHmmsssSSS"; - - public static String getReverseIndexDateTime(Date date) { - Validate.notNull(date); - String formattedDateString = new SimpleDateFormat(INDEXED_DATE_FORMAT).format(date); - long diff = INDEXED_DATE_SORT_VAL - Long.valueOf(formattedDateString); - - return Long.toString(diff); - } - - public static Map<String, IndexWorkPlan> planMap(def plans) { - plans.inject([:]) { map, plan -> - map.putAt(plan.indexType, plan) - map - } - } - - public static void initMRJob(Job job, String table, String outtable, String[] auths) { - Configuration conf = job.configuration - String username = conf.get(USERNAME) - String password = conf.get(PASSWORD) - String instance = conf.get(INSTANCE) - String zookeepers = conf.get(ZOOKEEPERS) - String mock = conf.get(MOCK) - - //input - if (Boolean.parseBoolean(mock)) { - AccumuloInputFormat.setMockInstance(job, instance) - AccumuloOutputFormat.setMockInstance(job, instance) - } else if (zookeepers != null) { - AccumuloInputFormat.setZooKeeperInstance(job, instance, zookeepers) - AccumuloOutputFormat.setZooKeeperInstance(job, instance, zookeepers) - } else { - throw new IllegalArgumentException("Must specify either mock or zookeepers"); - } - - AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes())) - AccumuloInputFormat.setInputTableName(job, table) - job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths)) - - // OUTPUT - job.setOutputFormatClass(AccumuloOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes())) - AccumuloOutputFormat.setDefaultTableName(job, outtable) - } - - public static void addMRPerformance(Configuration conf) { - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - conf.setBoolean("mapred.compress.map.output", true); - conf.set("mapred.map.output.compression.codec", GzipCodec.class.getName()); - } - - public static Instance instance(Configuration conf) { - assert conf != null - - String instance_str = conf.get(INSTANCE) - String zookeepers = conf.get(ZOOKEEPERS) - String mock = conf.get(MOCK) - if (Boolean.parseBoolean(mock)) { - return new MockInstance(instance_str) - } else if (zookeepers != null) { - return new ZooKeeperInstance(instance_str, zookeepers) - } else { - throw new IllegalArgumentException("Must specify either mock or zookeepers"); - } - } - - public static Connector connector(Instance instance, Configuration conf) { - String username = conf.get(USERNAME) - String password = conf.get(PASSWORD) - if (instance == null) - instance = instance(conf) - return instance.getConnector(username, password) - } - - public static void writeMutations(Connector connector, String tableName, def mutations) { - def bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4); - mutations.each { m -> - bw.addMutation(m) - } - bw.flush() - bw.close() - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java deleted file mode 100644 index 0ed8026..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/AccumuloSelectivityEvalDAO.java +++ /dev/null @@ -1,640 +0,0 @@ -package mvm.rya.joinselect; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfUtils; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RdfDAOException; -import mvm.rya.api.persist.RdfEvalStatsDAO; -import mvm.rya.api.persist.joinselect.SelectivityEvalDAO; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Resource; -import org.openrdf.model.URI; -import org.openrdf.query.algebra.QueryModelNode; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.evaluation.impl.ExternalSet; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - - - - - -public class AccumuloSelectivityEvalDAO implements SelectivityEvalDAO<RdfCloudTripleStoreConfiguration> { - - private boolean initialized = false; - private RdfCloudTripleStoreConfiguration conf; - private Connector connector; - private TableLayoutStrategy tableLayoutStrategy; - private boolean filtered = false; - private boolean denormalized = false; - private int FullTableCardinality = 0; - private static final String DELIM = "\u0000"; - private Map<String,Long> joinMap = new HashMap<String,Long>();; - private RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> resd; - - @Override - public void init() throws RdfDAOException { - try { - if (isInitialized()) { - throw new IllegalStateException("Already initialized"); - } - if (!resd.isInitialized()) { - resd.init(); - } - checkNotNull(connector); - tableLayoutStrategy = conf.getTableLayoutStrategy(); - TableOperations tos = connector.tableOperations(); - AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getSelectivity()); - AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getProspects()); - initialized = true; - } catch (Exception e) { - throw new RdfDAOException(e); - } - } - - - public AccumuloSelectivityEvalDAO() { - - } - - - public AccumuloSelectivityEvalDAO(RdfCloudTripleStoreConfiguration conf, Connector connector) { - - this.conf = conf; - this.connector = connector; - } - - public AccumuloSelectivityEvalDAO(RdfCloudTripleStoreConfiguration conf) { - - this.conf = conf; - Instance inst = new ZooKeeperInstance(conf.get("sc.cloudbase.instancename"), conf.get("sc.cloudbase.zookeepers")); - try { - this.connector = inst.getConnector(conf.get("sc.cloudbase.username"), conf.get("sc.cloudbase.password")); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } - } - - @Override - public void destroy() throws RdfDAOException { - if (!isInitialized()) { - throw new IllegalStateException("Not initialized"); - } - initialized = false; - } - - @Override - public boolean isInitialized() throws RdfDAOException { - return initialized; - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - @Override - public RdfCloudTripleStoreConfiguration getConf() { - return conf; - } - - @Override - public void setConf(RdfCloudTripleStoreConfiguration conf) { - this.conf = conf; - } - - public RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> getRdfEvalDAO() { - return resd; - } - - public void setRdfEvalDAO(RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> resd) { - this.resd = resd; - } - - public void setFiltered(boolean filtered) { - this.filtered = filtered; - } - - - public void setDenormalized(boolean denormalize) { - this.denormalized = denormalize; - } - - private double getJoinSelect(RdfCloudTripleStoreConfiguration conf, StatementPattern sp1, StatementPattern sp2) throws TableNotFoundException { - - if (FullTableCardinality == 0) { - this.getTableSize(conf); - } - - Authorizations authorizations = getAuths(conf); - String row1 = CardinalityCalcUtil.getRow(sp1, true); - String row2 = CardinalityCalcUtil.getRow(sp2, true); - List<String> joinType = CardinalityCalcUtil.getJoins(sp1, sp2); - - if (joinType.size() == 0) { - return 1; - } - - if (joinType.size() == 2) { - - String cacheRow1; - String cacheRow2; - long card1 = 0; - long card2 = 0; - boolean contCard1 = false; - boolean contCard2 = false; - - cacheRow1 = row1 + DELIM + joinType.get(0); - cacheRow2 = row2 + DELIM + joinType.get(1); - - long count1 = getCardinality(conf, sp1); - long count2 = getCardinality(conf, sp2); - - if (count1 == 0 || count2 == 0) { - return 0; - } - - if (joinMap.containsKey(cacheRow1)) { - card1 = joinMap.get(cacheRow1); - contCard1 = true; - } - if (joinMap.containsKey(cacheRow2)) { - card2 = joinMap.get(cacheRow2); - contCard2 = true; - } - - if (!contCard1) { - Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); - joinScanner.setRange(Range.prefix(row1)); - - for (Map.Entry<Key,Value> entry : joinScanner) { - if (entry.getKey().getColumnFamily().toString().equals(joinType.get(0))) { - card1 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow1, card1); - // System.out.println("Card1 is " + card1); - break; - } - } - } - - if (!contCard2) { - Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); - joinScanner.setRange(Range.prefix(row2)); - for (Map.Entry<Key,Value> entry : joinScanner) { - if (entry.getKey().getColumnFamily().toString().equals(joinType.get(1))) { - card2 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow2, card2); - // System.out.println("Card2 is " + card2); - break; - } - } - - } - - if (!filtered && !denormalized) { - double temp1 = Math.min(((double) card1) / ((double) count1 * FullTableCardinality), ((double) card2) / ((double) count2 * FullTableCardinality)); - - double temp2 = Math.max((double) count1 / FullTableCardinality, (double) count2 / FullTableCardinality); - - // TODO maybe change back to original form as temp2 will rarely be less than temp1. - return Math.min(temp1, temp2); - } else if(denormalized) { - return Math.min(card1,card2); - } else { - - return Math.min(((double) card1 * count2) / ((double) count1 * FullTableCardinality * FullTableCardinality), ((double) card2 * count1) - / ((double) count2 * FullTableCardinality * FullTableCardinality)); - - } - } else { - - String cacheRow1 = row1 + DELIM + joinType.get(0); - String cacheRow2 = row1 + DELIM + joinType.get(1); - String cacheRow3 = row2 + DELIM + joinType.get(2); - String cacheRow4 = row2 + DELIM + joinType.get(3); - long card1 = 0; - long card2 = 0; - long card3 = 0; - long card4 = 0; - boolean contCard1 = false; - boolean contCard2 = false; - - long count1 = getCardinality(conf, sp1); - long count2 = getCardinality(conf, sp2); - - if (count1 == 0 || count2 == 0) { - return 0; - } - - if (joinMap.containsKey(cacheRow1) && joinMap.containsKey(cacheRow2)) { - card1 = joinMap.get(cacheRow1); - card2 = joinMap.get(cacheRow2); - contCard1 = true; - } - if (joinMap.containsKey(cacheRow3) && joinMap.containsKey(cacheRow4)) { - card3 = joinMap.get(cacheRow3); - card4 = joinMap.get(cacheRow4); - contCard2 = true; - } - - if (!contCard1) { - Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); - joinScanner.setRange(Range.prefix(row1)); - boolean found1 = false; - boolean found2 = false; - - for (Map.Entry<Key,Value> entry : joinScanner) { - - if (entry.getKey().getColumnFamily().toString().equals(joinType.get(0))) { - card1 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow1, card1); - found1 = true; - // System.out.println("Card1 is " + card1); - if (found1 && found2) { - card1 = Math.min(card1, card2); - break; - } - } else if (entry.getKey().getColumnFamily().toString().equals(joinType.get(1))) { - card2 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow2, card2); - found2 = true; - // System.out.println("Card1 is " + card1); - if (found1 && found2) { - card1 = Math.min(card1, card2); - break; - } - } - } - } - - if (!contCard2) { - Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); - joinScanner.setRange(Range.prefix(row2)); - boolean found1 = false; - boolean found2 = false; - for (Map.Entry<Key,Value> entry : joinScanner) { - if (entry.getKey().getColumnFamily().toString().equals(joinType.get(2))) { - card3 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow3, card3); - found1 = true; - // System.out.println("Card2 is " + card2); - if (found1 && found2) { - card3 = Math.min(card3, card4); - break; - } - } else if (entry.getKey().getColumnFamily().toString().equals(joinType.get(3))) { - card4 = CardinalityCalcUtil.getJCard(entry.getKey()); - joinMap.put(cacheRow4, card4); - found2 = true; - // System.out.println("Card1 is " + card1); - if (found1 && found2) { - card3 = Math.min(card3, card4); - break; - } - } - } - - } - - if (!filtered && !denormalized) { - return Math.min(((double) card1) / ((double) count1 * FullTableCardinality), ((double) card3) / ((double) count2 * FullTableCardinality)); - } else if(denormalized) { - return Math.min(card1,card3); - } else { - return Math.min(((double) card1 * count2) / ((double) count1 * FullTableCardinality * FullTableCardinality), ((double) card3 * count1) - / ((double) count2 * FullTableCardinality * FullTableCardinality)); - - } - - } - - } - - // TODO currently computes average selectivity of sp1 with each node in TupleExpr te (is this best?) - private double getSpJoinSelect(RdfCloudTripleStoreConfiguration conf, TupleExpr te, StatementPattern sp1) - throws TableNotFoundException { - - // System.out.println("Tuple is " + te + " and sp is " + sp1); - - if (te instanceof StatementPattern) { - return getJoinSelect(conf, (StatementPattern) te, sp1); - } else { - - SpExternalCollector spe = new SpExternalCollector(); - te.visit(spe); - List<QueryModelNode> espList = spe.getSpExtTup(); - - if (espList.size() == 0) { - - Set<String> tupBn = te.getAssuredBindingNames(); - Set<String> eBn = sp1.getAssuredBindingNames(); - Set<String> intersect = Sets.intersection(tupBn, eBn); - - return Math.pow(1.0 / 10000.0, intersect.size()); - - } - - double min = Double.MAX_VALUE; - double select = Double.MAX_VALUE; - - for (QueryModelNode node : espList) { - - if (node instanceof StatementPattern) - select = getJoinSelect(conf, sp1, (StatementPattern) node); - else if (node instanceof ExternalSet) { - select = getExtJoinSelect(sp1, (ExternalSet) node); - } - - if (min > select) { - min = select; - } - } - // System.out.println("Max is " + max); - return min; - } - } - - public double getJoinSelect(RdfCloudTripleStoreConfiguration conf, TupleExpr te1, TupleExpr te2) throws TableNotFoundException { - - SpExternalCollector spe = new SpExternalCollector(); - te2.visit(spe); - List<QueryModelNode> espList = spe.getSpExtTup(); - - double min = Double.MAX_VALUE; - - for (QueryModelNode node : espList) { - double select = getSelectivity(conf, te1, node); - if (min > select) { - min = select; - } - } - - return min; - } - - - - - private double getSelectivity(RdfCloudTripleStoreConfiguration conf, TupleExpr te, QueryModelNode node) throws TableNotFoundException { - - if ((node instanceof StatementPattern)) { - return getSpJoinSelect(conf, te, (StatementPattern) node); - - } else if (node instanceof ExternalSet) { - - return getExtJoinSelect(te, (ExternalSet) node); - - } else { - return 0; - } - - } - - - - - - private double getExtJoinSelect(TupleExpr te, ExternalSet eSet) { - - Set<String> tupBn = te.getAssuredBindingNames(); - Set<String> eBn = eSet.getAssuredBindingNames(); - Set<String> intersect = Sets.intersection(tupBn, eBn); - - return Math.pow(1.0 / 10000.0, intersect.size()); - - } - - - - - - - - - - - - // obtains cardinality for StatementPattern. Returns cardinality of 0 - // if no instances of constants occur in table. - // assumes composite cardinalities will be used. - @Override - public long getCardinality(RdfCloudTripleStoreConfiguration conf, StatementPattern sp) throws TableNotFoundException { - - Var subjectVar = sp.getSubjectVar(); - Resource subj = (Resource) getConstantValue(subjectVar); - Var predicateVar = sp.getPredicateVar(); - URI pred = (URI) getConstantValue(predicateVar); - Var objectVar = sp.getObjectVar(); - org.openrdf.model.Value obj = getConstantValue(objectVar); - Resource context = (Resource) getConstantValue(sp.getContextVar()); - - /** - * We put full triple scans before rdf:type because more often than not the triple scan is being joined with something else that is better than asking the - * full rdf:type of everything. - */ - double cardinality = 0; - try { - cardinality = 2*getTableSize(conf); - } catch (Exception e1) { - e1.printStackTrace(); - } - try { - if (subj != null) { - List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); - CARDINALITY_OF card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECT; - values.add(subj); - - if (pred != null) { - values.add(pred); - card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECTPREDICATE; - } else if (obj != null) { - values.add(obj); - card = RdfEvalStatsDAO.CARDINALITY_OF.SUBJECTOBJECT; - } - - double evalCard = this.getCardinality(conf, card, values, context); - // the cardinality will be -1 if there was no value found (if - // the index does not exist) - if (evalCard >= 0) { - cardinality = Math.min(cardinality, evalCard); - } else { - // TODO change this to agree with prospector - cardinality = 0; - } - } else if (pred != null) { - List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); - CARDINALITY_OF card = RdfEvalStatsDAO.CARDINALITY_OF.PREDICATE; - values.add(pred); - - if (obj != null) { - values.add(obj); - card = RdfEvalStatsDAO.CARDINALITY_OF.PREDICATEOBJECT; - } - - double evalCard = this.getCardinality(conf, card, values, context); - if (evalCard >= 0) { - cardinality = Math.min(cardinality, evalCard); - } else { - // TODO change this to agree with prospector - cardinality = 0; - } - } else if (obj != null) { - List<org.openrdf.model.Value> values = new ArrayList<org.openrdf.model.Value>(); - values.add(obj); - double evalCard = this.getCardinality(conf, RdfEvalStatsDAO.CARDINALITY_OF.OBJECT, values, context); - if (evalCard >= 0) { - cardinality = Math.min(cardinality, evalCard); - } else { - // TODO change this to agree with prospector - cardinality = 0; - } - } else { - cardinality = getTableSize(conf); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - // TODO is this okay? - return (long) cardinality; - } - - private org.openrdf.model.Value getConstantValue(Var var) { - if (var != null) - return var.getValue(); - else - return null; - } - - public double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<org.openrdf.model.Value> val) throws RdfDAOException { - return resd.getCardinality(conf, card, val); - } - - public double getCardinality(RdfCloudTripleStoreConfiguration conf, CARDINALITY_OF card, List<org.openrdf.model.Value> val, Resource context) throws RdfDAOException { - - return resd.getCardinality(conf, card, val, context); - - } - - public int getTableSize(RdfCloudTripleStoreConfiguration conf) throws TableNotFoundException { - - Authorizations authorizations = getAuths(conf); - - - if (joinMap.containsKey("subjectpredicateobject" + DELIM + "FullTableCardinality")) { - FullTableCardinality = joinMap.get("subjectpredicateobject" + DELIM + "FullTableCardinality").intValue(); - return FullTableCardinality; - } - - if (FullTableCardinality == 0) { - Scanner joinScanner = connector.createScanner(tableLayoutStrategy.getSelectivity(), authorizations); - joinScanner.setRange(Range.prefix(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality"))); - Iterator<Map.Entry<Key,Value>> iterator = joinScanner.iterator(); - if (iterator.hasNext()) { - Map.Entry<Key,Value> entry = iterator.next(); - if (entry.getKey().getColumnFamily().toString().equals("FullTableCardinality")) { - String Count = entry.getKey().getColumnQualifier().toString(); - FullTableCardinality = Integer.parseInt(Count); - } - } - if (FullTableCardinality == 0) { - throw new RuntimeException("Table does not contain full cardinality"); - } - - } - - return FullTableCardinality; - - } - - - private Authorizations getAuths(RdfCloudTripleStoreConfiguration conf) { - String[] auths = conf.getAuths(); - Authorizations authorizations = null; - if (auths == null || auths.length == 0) { - authorizations = new Authorizations(); - } else { - authorizations = new Authorizations(auths); - } - - return authorizations; - } - - - - private static class SpExternalCollector extends QueryModelVisitorBase<RuntimeException> { - - private List<QueryModelNode> eSet = Lists.newArrayList(); - - - @Override - public void meetNode(QueryModelNode node) throws RuntimeException { - if (node instanceof ExternalSet || node instanceof StatementPattern) { - eSet.add(node); - } - super.meetNode(node); - } - - public List<QueryModelNode> getSpExtTup() { - return eSet; - } - - } - - - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java deleted file mode 100644 index a54a5af..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/CardinalityCalcUtil.java +++ /dev/null @@ -1,267 +0,0 @@ -package mvm.rya.joinselect; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.util.ArrayList; -import java.util.List; - -import org.apache.accumulo.core.data.Key; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.Var; - -public class CardinalityCalcUtil { - - private static final String DELIM = "\u0000"; - - private static String intToTriplePlace(int i) { - - int place = i; - - switch (place) { - - case 0: - return "subject"; - - case 1: - return "predicate"; - - case 2: - return "object"; - - default: - throw new IllegalArgumentException("Invalid integer triple place."); - - } - - } - - private static int triplePlaceToInt(String s) { - - if (s.equals("subject")) { - return 0; - } else if (s.equals("predicate")) { - return 1; - } else if (s.equals("object")) { - return 2; - } else - throw new IllegalArgumentException("Invalid triple place."); - - } - - private static List<String> getVariablePos(StatementPattern sp) { - - List<String> posList = new ArrayList<String>(); - List<Var> varList = sp.getVarList(); - - for (int i = 0; i < 3; i++) { - if (!varList.get(i).isConstant()) { - posList.add(intToTriplePlace(i)); - - } - } - - return posList; - - } - - private static List<String> getConstantPos(StatementPattern sp) { - - List<String> posList = new ArrayList<String>(); - List<Var> varList = sp.getVarList(); - - for (int i = 0; i < 3; i++) { - if (varList.get(i).isConstant()) { - posList.add(intToTriplePlace(i)); - - } - } - - return posList; - - } - - // assumes sp contains at most two constants - // TODO might not be good if all variable sp is needed to get table size - public static String getRow(StatementPattern sp, boolean joinTable) { - - String row = ""; - String values = ""; - List<Var> varList = sp.getVarList(); - List<String> constList = CardinalityCalcUtil.getConstantPos(sp); - int i; - - for (String s : constList) { - - i = CardinalityCalcUtil.triplePlaceToInt(s); - - if (row.equals("subject") && s.equals("object") && joinTable) { - row = s + row; - if (values.length() == 0) { - values = values + removeQuotes(varList.get(i).getValue().toString()); - } else { - values = removeQuotes(varList.get(i).getValue().toString()) + DELIM + values; - } - } else { - row = row + s; - if (values.length() == 0) { - values = values + removeQuotes(varList.get(i).getValue().toString()); - } else { - values = values + DELIM + removeQuotes(varList.get(i).getValue().toString()); - } - } - - } - - return (row + DELIM + values); - - } - - - - - private static String removeQuotes(String s) { - String trim = s.trim(); - if (trim.substring(0, 1).equals("\"")) { - trim = trim.substring(1, trim.length() - 1); - } - return trim; - } - - - - - - public static long getJCard(Key key) { - - String s = key.getColumnQualifier().toString(); - return Long.parseLong(s); - - } - - //determines a list of the positions in which two SPs have a common variable - private static List<String> getJoinType(StatementPattern sp1, StatementPattern sp2) { - - List<String> joinList = new ArrayList<String>(); - List<Var> spList1 = sp1.getVarList(); - List<Var> spList2 = sp2.getVarList(); - - List<String> pos1 = CardinalityCalcUtil.getVariablePos(sp1); - List<String> pos2 = CardinalityCalcUtil.getVariablePos(sp2); - - int i, j; - - for (String s : pos1) { - for (String t : pos2) { - i = CardinalityCalcUtil.triplePlaceToInt(s); - j = CardinalityCalcUtil.triplePlaceToInt(t); - - if (spList1.get(i).getName().equals(spList2.get(j).getName())) { - joinList.add(s); - joinList.add(t); - - } - - } - } - if (joinList.size() == 4) { - return orderJoinType(joinList); - } - - return joinList; - - } - - // assumes list size is four - private static List<String> orderJoinType(List<String> jList) { - - List<String> tempList = new ArrayList<String>(); - - if (jList.get(0).equals("subject") && jList.get(2).equals("object")) { - tempList.add(jList.get(2)); - tempList.add(jList.get(0)); - tempList.add(jList.get(3)); - tempList.add(jList.get(1)); - return tempList; - } else { - tempList.add(jList.get(0)); - tempList.add(jList.get(2)); - tempList.add(jList.get(1)); - tempList.add(jList.get(3)); - return tempList; - } - - } - - // assumes size is four - private static List<String> reverseJoinType(List<String> jList) { - - List<String> tempList = new ArrayList<String>(); - - if (jList.get(2).equals("subject") && jList.get(3).equals("object")) { - tempList.add(jList.get(3)); - tempList.add(jList.get(2)); - tempList.add(jList.get(1)); - tempList.add(jList.get(0)); - return tempList; - } else if (jList.get(2).equals("predicate") && jList.get(3).equals("subject")) { - tempList.add(jList.get(3)); - tempList.add(jList.get(2)); - tempList.add(jList.get(1)); - tempList.add(jList.get(0)); - return tempList; - } else if (jList.get(2).equals("object") && jList.get(3).equals("predicate")) { - tempList.add(jList.get(3)); - tempList.add(jList.get(2)); - tempList.add(jList.get(1)); - tempList.add(jList.get(0)); - return tempList; - } else { - tempList.add(jList.get(2)); - tempList.add(jList.get(3)); - tempList.add(jList.get(0)); - tempList.add(jList.get(1)); - return tempList; - } - } - - public static List<String> getJoins(StatementPattern sp1, StatementPattern sp2) { - List<String> jList = new ArrayList<String>(); - List<String> list = getJoinType(sp1, sp2); - if (list.size() == 0) { - return list; - } else if (list.size() == 2) { - jList.add(list.get(0) + list.get(1)); - jList.add(list.get(1) + list.get(0)); - return jList; - } else { - - list = orderJoinType(list); - jList.add(list.get(0) + list.get(1) + list.get(2) + list.get(3)); - jList.add(list.get(0) + list.get(1) + list.get(3) + list.get(2)); - list = reverseJoinType(list); - jList.add(list.get(0) + list.get(1) + list.get(2) + list.get(3)); - jList.add(list.get(0) + list.get(1) + list.get(3) + list.get(2)); - return jList; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java deleted file mode 100644 index 5d3d643..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/FullTableSize.java +++ /dev/null @@ -1,129 +0,0 @@ -package mvm.rya.joinselect.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SELECTIVITY_TABLE; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_TABLE; - -import java.io.IOException; - -import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class FullTableSize extends Configured implements Tool { - - private static final String DELIM = "\u0000"; - - - - - - public static void main(String[] args) throws Exception { - ToolRunner.run(new FullTableSize(), args); - } - - - - - - - public static class FullTableMapper extends Mapper<Key,Value,Text,IntWritable> { - private static final IntWritable ONE = new IntWritable(1); - - - @Override - public void map(Key key, Value value, Context context) throws IOException, InterruptedException { - context.write(new Text("COUNT"), ONE); - } - } - - public static class FullTableReducer extends Reducer<Text,IntWritable,Text,Mutation> { - - @Override - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { - int count = 0; - - for (IntWritable i : values) { - count += i.get(); - } - - String countStr = Integer.toString(count); - - Mutation m = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); - m.put(new Text("FullTableCardinality"), new Text(countStr), new Value(new byte[0])); - - context.write(new Text(""), m); - } - } - - public static class FullTableCombiner extends Reducer<Text,IntWritable,Text,IntWritable> { - - @Override - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { - - int count = 0; - - for (IntWritable i : values) { - count += i.get(); - } - - context.write(key, new IntWritable(count)); - } - } - - @Override - public int run(String[] args) throws Exception { - - Configuration conf = getConf(); - String inTable = conf.get(SPO_TABLE); - String outTable = conf.get(SELECTIVITY_TABLE); - String auths = conf.get(AUTHS); - - assert inTable != null && outTable != null; - - Job job = new Job(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - JoinSelectStatsUtil.initTableMRJob(job, inTable, outTable, auths); - job.setMapperClass(FullTableMapper.class); - job.setCombinerClass(FullTableCombiner.class); - job.setReducerClass(FullTableReducer.class); - job.setNumReduceTasks(1); - - job.waitForCompletion(true); - - return job.isSuccessful() ? 0 : 1; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java deleted file mode 100644 index bb227f3..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectAggregate.java +++ /dev/null @@ -1,272 +0,0 @@ -package mvm.rya.joinselect.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.OUTPUTPATH; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.SPO_OUTPUTPATH; - -import java.io.IOException; - -import mvm.rya.joinselect.mr.utils.CardList; -import mvm.rya.joinselect.mr.utils.CardinalityType; -import mvm.rya.joinselect.mr.utils.CompositeType; -import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; -import mvm.rya.joinselect.mr.utils.TripleCard; -import mvm.rya.joinselect.mr.utils.TripleEntry; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Partitioner; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; - -public class JoinSelectAggregate extends Configured implements Tool { - - public static class JoinSelectAggregateMapper extends Mapper<CompositeType,TripleCard,CompositeType,TripleCard> { - - public void map(CompositeType key, TripleCard value, Context context) throws IOException, InterruptedException { - - context.write(key, value); - - } - - } - - public static class JoinReducer extends Reducer<CompositeType,TripleCard,TripleEntry,CardList> { - - public void reduce(CompositeType key, Iterable<TripleCard> values, Context context) throws IOException, InterruptedException { - - CardinalityType card; - TripleEntry triple; - CardinalityType subjectCard = null; - CardinalityType objectCard = null; - CardinalityType predicateCard = null; - CardinalityType spCard = null; - CardinalityType soCard = null; - CardinalityType poCard = null; - CardList cList = new CardList((long) 0, (long) 0, (long) 0, (long) 0, (long) 0, (long) 0); - boolean listEmpty = true; - - // System.out.println("********************************************************************"); - // System.out.println("Key is " + key ); - - for (TripleCard val : values) { - - // System.out.println("Value in iterable is " + val); - if (!val.isCardNull()) { - card = val.getCard(); - - if (card.getCardType().toString().equals("object")) { - if (objectCard == null) { - objectCard = new CardinalityType(); - objectCard.set(card); - - } else if (objectCard.compareTo(card) > 0) { - // System.out.println(objectCard.compareTo(card)); - objectCard.set(card); - - } - - } else if (card.getCardType().toString().equals("predicate")) { - // System.out.println("Coming in here?"); - if (predicateCard == null) { - predicateCard = new CardinalityType(); - predicateCard.set(card); - - } else if (predicateCard.compareTo(card) > 0) { - predicateCard.set(card); - - } - } else if (card.getCardType().toString().equals("subject")) { - if (subjectCard == null) { - subjectCard = new CardinalityType(); - subjectCard.set(card); - - } else if (subjectCard.compareTo(card) > 0) { - subjectCard.set(card); - } - - } else if (card.getCardType().toString().equals("subjectpredicate")) { - if (spCard == null) { - spCard = new CardinalityType(); - spCard.set(card); - - } else if (spCard.compareTo(card) > 0) { - spCard.set(card); - - } - } else if (card.getCardType().toString().equals("subjectobject")) { - if (soCard == null) { - soCard = new CardinalityType(); - soCard.set(card); - - } else if (soCard.compareTo(card) > 0) { - soCard.set(card); - - } - } else if (card.getCardType().toString().equals("predicateobject")) { - if (poCard == null) { - poCard = new CardinalityType(); - poCard.set(card); - - } else if (poCard.compareTo(card) > 0) { - poCard.set(card); - - } - } - - } else { - - if (listEmpty) { - if (subjectCard != null || predicateCard != null || objectCard != null) { - - if (subjectCard != null) { - cList.setSCard(subjectCard.getCard().get()); - } - if (predicateCard != null) { - cList.setPCard(predicateCard.getCard().get()); - } - if (objectCard != null) { - cList.setOCard(objectCard.getCard().get()); - } - - listEmpty = false; - - } else if (spCard != null || poCard != null || soCard != null) { - - if (spCard != null) { - cList.setSPCard(spCard.getCard().get()); - } - if (poCard != null) { - cList.setPOCard(poCard.getCard().get()); - } - if (soCard != null) { - cList.setSOCard(soCard.getCard().get()); - } - - listEmpty = false; - } - - // System.out.println("Cardlist is " + cList); - // System.out.println("Cards are " + - // subjectCard.getCard() + "," + predicateCard.getCard() - // + - // "," + objectCard.getCard() + "," + spCard.getCard() + - // "," + poCard.getCard() + "," + soCard.getCard()); - // - } - - // only write record if cardList contains at least one - // nonzero entry - if (!val.isTeNull() && !listEmpty) { - - triple = (TripleEntry) val.getTE(); - - context.write(triple, cList); - // System.out.println("Triple is " + triple + - // " and cardinality is " + cList); - - } - - } - } - - } - - } - - public static class JoinSelectPartitioner extends Partitioner<CompositeType,TripleCard> { - - @Override - public int getPartition(CompositeType key, TripleCard value, int numPartitions) { - return Math.abs(key.getOldKey().hashCode() * 127) % numPartitions; - } - - } - - public static class JoinSelectGroupComparator extends WritableComparator { - - protected JoinSelectGroupComparator() { - super(CompositeType.class, true); - } - - @SuppressWarnings("rawtypes") - @Override - public int compare(WritableComparable w1, WritableComparable w2) { - CompositeType ct1 = (CompositeType) w1; - CompositeType ct2 = (CompositeType) w2; - return ct1.getOldKey().compareTo(ct2.getOldKey()); - } - - } - - public static class JoinSelectSortComparator extends WritableComparator { - - protected JoinSelectSortComparator() { - super(CompositeType.class, true); - } - - @SuppressWarnings("rawtypes") - @Override - public int compare(WritableComparable w1, WritableComparable w2) { - CompositeType ct1 = (CompositeType) w1; - CompositeType ct2 = (CompositeType) w2; - return ct1.compareTo(ct2); - } - - } - - @Override - public int run(String[] args) throws Exception { - Configuration conf = getConf(); - String inPath1 = conf.get(PROSPECTS_OUTPUTPATH); - String inPath2 = conf.get(SPO_OUTPUTPATH); - String auths = conf.get(AUTHS); - String outPath = conf.get(OUTPUTPATH); - - assert inPath1 != null && inPath2 != null && outPath != null; - - Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); - - JoinSelectStatsUtil.initJoinMRJob(job, inPath1, inPath2, JoinSelectAggregateMapper.class, outPath, auths); - - job.setSortComparatorClass(JoinSelectSortComparator.class); - job.setGroupingComparatorClass(JoinSelectGroupComparator.class); - job.setPartitionerClass(JoinSelectPartitioner.class); - job.setReducerClass(JoinReducer.class); - job.setNumReduceTasks(32); - job.waitForCompletion(true); - - return job.isSuccessful() ? 0 : 1; - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java deleted file mode 100644 index e6a89ce..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectDriver.java +++ /dev/null @@ -1,60 +0,0 @@ -package mvm.rya.joinselect.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -public class JoinSelectDriver extends Configured implements Tool { - - public static void main(String[] args) throws Exception { - ToolRunner.run(new JoinSelectDriver(), args); - } - - @Override - public int run(String[] args) throws Exception { - - Configuration conf = getConf(); - System.out.println("Zookeepers are " + conf.get("zookeepers")); - - int res; - res = ToolRunner.run(conf, new FullTableSize(), args); - - if (res == 0) { - res = ToolRunner.run(conf, new JoinSelectSpoTableOutput(), args); - } - if (res == 0) { - res = ToolRunner.run(conf, new JoinSelectProspectOutput(), args); - } - if (res == 0) { - res = ToolRunner.run(conf, new JoinSelectAggregate(), args); - } - if (res == 0) { - res = ToolRunner.run(conf, new JoinSelectStatisticsSum(), args); - } - - return res; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java b/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java deleted file mode 100644 index a12793d..0000000 --- a/extras/rya.prospector/src/main/java/mvm/rya/joinselect/mr/JoinSelectProspectOutput.java +++ /dev/null @@ -1,124 +0,0 @@ -package mvm.rya.joinselect.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH; -import static mvm.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_TABLE; - -import java.io.IOException; -import java.util.regex.Pattern; - -import mvm.rya.joinselect.mr.utils.CardinalityType; -import mvm.rya.joinselect.mr.utils.CompositeType; -import mvm.rya.joinselect.mr.utils.JoinSelectStatsUtil; -import mvm.rya.joinselect.mr.utils.TripleCard; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.util.Tool; - -public class JoinSelectProspectOutput extends Configured implements Tool { - - public static class CardinalityMapper extends Mapper<Key,Value,CompositeType,TripleCard> { - - private static final String DELIM = "\u0000"; - - Text inText = new Text(); - Pattern splitPattern = Pattern.compile(DELIM); - - public void map(Key key, Value data, Context context) throws IOException, InterruptedException { - - key.getRow(inText); - String[] cardData = splitPattern.split(inText.toString().trim(), 4); - // System.out.println("Card data is " + cardData[0] + ", "+ cardData[1] + ", "+ cardData[2]); - if (cardData.length == 3 && ((cardData[0].equals("subject")) || (cardData[0].equals("object")) || (cardData[0].equals("predicate")))) { - Text tripleValType = new Text(cardData[0]); - Text cardKey = new Text(cardData[1]); - LongWritable ts = new LongWritable(Long.valueOf(cardData[2])); - - String s = new String(data.get()); - LongWritable card = new LongWritable(Long.parseLong(s)); - - CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); - TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); - - context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); - // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); - - } else if (cardData.length == 4 - && ((cardData[0].equals("subjectpredicate")) || (cardData[0].equals("subjectobject")) || (cardData[0].equals("predicateobject")))) { - - Text tripleValType = new Text(cardData[0]); - Text cardKey = new Text(cardData[1] + DELIM + cardData[2]); - LongWritable ts = new LongWritable(Long.valueOf(cardData[3])); - - String s = new String(data.get()); - LongWritable card = new LongWritable(Long.parseLong(s)); - - CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); - TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); - - context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); - // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); - - } - - } - - } - - @Override - public int run(String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException { - - Configuration conf = getConf(); - String inTable = conf.get(PROSPECTS_TABLE); - String auths = conf.get(AUTHS); - String outPath = conf.get(PROSPECTS_OUTPUTPATH); - - assert inTable != null && outPath != null; - - Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); - job.setJarByClass(this.getClass()); - conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); - - JoinSelectStatsUtil.initTabToSeqFileJob(job, inTable, outPath, auths); - job.setMapperClass(CardinalityMapper.class); - - job.setNumReduceTasks(0); - - job.waitForCompletion(true); - - return job.isSuccessful() ? 0 : 1; - - } - -}
