http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java new file mode 100644 index 0000000..aa5157b --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java @@ -0,0 +1,39 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; + +/** + * Interface AccumuloRdfConstants + * Date: Mar 1, 2012 + * Time: 7:24:52 PM + */ +public interface AccumuloRdfConstants { + public static final Authorizations ALL_AUTHORIZATIONS = Constants.NO_AUTHS; + + public static final Value EMPTY_VALUE = new Value(new byte[0]); + + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]); +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java new file mode 100644 index 0000000..65fad20 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java @@ -0,0 +1,172 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreStatement; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RdfDAOException; +import mvm.rya.api.persist.RdfEvalStatsDAO; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Resource; +import org.openrdf.model.Value; + +/** + * Class CloudbaseRdfEvalStatsDAO + * Date: Feb 28, 2012 + * Time: 5:03:16 PM + */ +public class AccumuloRdfEvalStatsDAO implements RdfEvalStatsDAO<AccumuloRdfConfiguration> { + + private boolean initialized = false; + private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); + private Connector connector; + + // private String evalTable = TBL_EVAL; + private TableLayoutStrategy tableLayoutStrategy; + + @Override + public void init() throws RdfDAOException { + try { + if (isInitialized()) { + throw new IllegalStateException("Already initialized"); + } + checkNotNull(connector); + tableLayoutStrategy = conf.getTableLayoutStrategy(); +// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); +// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); + + TableOperations tos = connector.tableOperations(); + AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); +// boolean tableExists = tos.exists(evalTable); +// if (!tableExists) +// tos.create(evalTable); + initialized = true; + } catch (Exception e) { + throw new RdfDAOException(e); + } + } + + + @Override + public void destroy() throws RdfDAOException { + if (!isInitialized()) { + throw new IllegalStateException("Not initialized"); + } + initialized = false; + } + + @Override + public boolean isInitialized() throws RdfDAOException { + return initialized; + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + public AccumuloRdfConfiguration getConf() { + return conf; + } + + public void setConf(AccumuloRdfConfiguration conf) { + this.conf = conf; + } + + @Override + public double getCardinality(AccumuloRdfConfiguration conf, + mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, + List<Value> val, Resource context) throws RdfDAOException { + try { + Authorizations authorizations = conf.getAuthorizations(); + Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); + Text cfTxt = null; + if (CARDINALITY_OF.SUBJECT.equals(card)) { + cfTxt = SUBJECT_CF_TXT; + } else if (CARDINALITY_OF.PREDICATE.equals(card)) { + cfTxt = PRED_CF_TXT; + } else if (CARDINALITY_OF.OBJECT.equals(card)) { +// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality + return Double.MAX_VALUE; + } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) { + cfTxt = SUBJECTOBJECT_CF_TXT; + } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) { + cfTxt = SUBJECTPRED_CF_TXT; + } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) { + cfTxt = PREDOBJECT_CF_TXT; + } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); + Text cq = EMPTY_TEXT; + if (context != null) { + cq = new Text(context.stringValue().getBytes()); + } + scanner.fetchColumn(cfTxt, cq); + Iterator<Value> vals = val.iterator(); + String compositeIndex = vals.next().stringValue(); + while (vals.hasNext()){ + compositeIndex += DELIM + vals.next().stringValue(); + } + scanner.setRange(new Range(new Text(compositeIndex.getBytes()))); + Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter = scanner.iterator(); + if (iter.hasNext()) { + return Double.parseDouble(new String(iter.next().getValue().get())); + } + } catch (Exception e) { + throw new RdfDAOException(e); + } + + //default + return -1; + } + + @Override + public double getCardinality(AccumuloRdfConfiguration conf, + mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, + List<Value> val) throws RdfDAOException { + return getCardinality(conf, card, val, null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java new file mode 100644 index 0000000..13ae37f --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java @@ -0,0 +1,297 @@ +//package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +// +//import com.google.common.collect.Iterators; +//import com.google.common.io.ByteArrayDataInput; +//import com.google.common.io.ByteStreams; +//import info.aduna.iteration.CloseableIteration; +//import mvm.rya.api.RdfCloudTripleStoreConstants; +//import mvm.rya.api.RdfCloudTripleStoreUtils; +//import mvm.rya.api.persist.RdfDAOException; +//import mvm.rya.api.utils.NullableStatementImpl; +//import org.apache.accumulo.core.client.*; +//import org.apache.accumulo.core.data.Key; +//import org.apache.accumulo.core.data.Range; +//import org.apache.accumulo.core.iterators.user.AgeOffFilter; +//import org.apache.accumulo.core.iterators.user.TimestampFilter; +//import org.apache.accumulo.core.security.Authorizations; +//import org.apache.hadoop.io.Text; +//import org.openrdf.model.Resource; +//import org.openrdf.model.Statement; +//import org.openrdf.model.URI; +//import org.openrdf.model.Value; +//import org.openrdf.query.BindingSet; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import java.io.IOException; +//import java.util.Collection; +//import java.util.Collections; +//import java.util.HashSet; +//import java.util.Iterator; +//import java.util.Map.Entry; +// +//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; +//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue; +// +//public class AccumuloRdfQueryIterator implements +// CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> { +// +// protected final Logger logger = LoggerFactory.getLogger(getClass()); +// +// private boolean open = false; +// private Iterator result; +// private Resource[] contexts; +// private Collection<Entry<Statement, BindingSet>> statements; +// private int numOfThreads = 20; +// +// private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); +// private ScannerBase scanner; +// private boolean isBatchScanner = true; +// private Statement statement; +// Iterator<BindingSet> iter_bss = null; +// +// private boolean hasNext = true; +// private AccumuloRdfConfiguration conf; +// private TABLE_LAYOUT tableLayout; +// private Text context_txt; +// +// private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory(); +// +// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts) +// throws RdfDAOException { +// this(statements, connector, null, contexts); +// } +// +// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, +// AccumuloRdfConfiguration conf, Resource... contexts) +// throws RdfDAOException { +// this.statements = statements; +// this.contexts = contexts; +// this.conf = conf; +// initialize(connector); +// open = true; +// } +// +// public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value object, Connector connector, +// AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException { +// this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>( +// new NullableStatementImpl(subject, predicate, object, contexts), +// null)), connector, conf, contexts); +// } +// +// protected void initialize(Connector connector) +// throws RdfDAOException { +// try { +// //TODO: We cannot span multiple tables here +// Collection<Range> ranges = new HashSet<Range>(); +// +// result = Iterators.emptyIterator(); +// Long startTime = conf.getStartTime(); +// Long ttl = conf.getTtl(); +// +// Resource context = null; +// for (Entry<Statement, BindingSet> stmtbs : statements) { +// Statement stmt = stmtbs.getKey(); +// Resource subject = stmt.getSubject(); +// URI predicate = stmt.getPredicate(); +// Value object = stmt.getObject(); +// context = stmt.getContext(); //TODO: assumes the same context for all statements +// logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination"); +// +// Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf); +// tableLayout = entry.getKey(); +//// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange(); +// Range range = entry.getValue(); +// ranges.add(range); +// rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue())); +// } +// +// Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; +// String auth = conf.getAuth(); +// if (auth != null) { +// authorizations = new Authorizations(auth.split(",")); +// } +// String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf); +// result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges); +//// if (isBatchScanner) { +//// ((BatchScanner) scanner).setRanges(ranges); +//// } else { +//// for (Range range : ranges) { +//// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this +//// } +//// } +//// +//// if (isBatchScanner) { +//// result = ((BatchScanner) scanner).iterator(); +//// } else { +//// result = ((Scanner) scanner).iterator(); +//// } +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException { +//// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta) +// if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable +// BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads); +// scannerBase.setRanges(ranges); +// populateScanner(context, startTime, ttl, scannerBase); +// return scannerBase.iterator(); +// } else { +// isBatchScanner = false; +// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()]; +// int i = 0; +// for (Range range : ranges) { +// Scanner scannerBase = connector.createScanner(table, authorizations); +// populateScanner(context, startTime, ttl, scannerBase); +// scannerBase.setRange(range); +// iters[i] = scannerBase.iterator(); +// i++; +// scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed +// } +// return Iterators.concat(iters); +// } +// +// } +// +// protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException { +// if (context != null) { //default graph +// context_txt = new Text(writeValue(context)); +// scannerBase.fetchColumnFamily(context_txt); +// } +// +//// if (!isQueryTimeBased(conf)) { +// if (startTime != null && ttl != null) { +//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); +//// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); +//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl); +//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime); +// IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName()); +// TimestampFilter.setStart(setting, startTime, true); +// TimestampFilter.setEnd(setting, startTime + ttl, true); +// scannerBase.addScanIterator(setting); +// } else if (ttl != null) { +//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); +//// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName()); +//// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl); +// IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); +// AgeOffFilter.setTTL(setting, ttl); +// scannerBase.addScanIterator(setting); +// } +//// } +// } +// +// @Override +// public void close() throws RdfDAOException { +// if (!open) +// return; +// verifyIsOpen(); +// open = false; +// if (scanner != null && isBatchScanner) { +// ((BatchScanner) scanner).close(); +// } +// } +// +// public void verifyIsOpen() throws RdfDAOException { +// if (!open) { +// throw new RdfDAOException("Iterator not open"); +// } +// } +// +// @Override +// public boolean hasNext() throws RdfDAOException { +// try { +// if (!open) +// return false; +// verifyIsOpen(); +// /** +// * For some reason, the result.hasNext returns false +// * once at the end of the iterator, and then true +// * for every subsequent call. +// */ +// hasNext = (hasNext && result.hasNext()); +// return hasNext || ((iter_bss != null) && iter_bss.hasNext()); +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// @Override +// public Entry<Statement, BindingSet> next() throws RdfDAOException { +// try { +// if (!this.hasNext()) +// return null; +// +// return getStatement(result, contexts); +// } catch (Exception e) { +// throw new RdfDAOException(e); +// } +// } +// +// public Entry<Statement, BindingSet> getStatement( +// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults, +// Resource... filterContexts) throws IOException { +// try { +// while (true) { +// if (iter_bss != null && iter_bss.hasNext()) { +// return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next()); +// } +// +// if (rowResults.hasNext()) { +// Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next(); +// Key key = entry.getKey(); +// ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); +// statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY); +// iter_bss = rangeMap.containsKey(key).iterator(); +// } else +// break; +// } +// } catch (Exception e) { +// throw new IOException(e); +// } +// return null; +// } +// +// @Override +// public void remove() throws RdfDAOException { +// next(); +// } +// +// public int getNumOfThreads() { +// return numOfThreads; +// } +// +// public void setNumOfThreads(int numOfThreads) { +// this.numOfThreads = numOfThreads; +// } +// +// public DefineTripleQueryRangeFactory getQueryRangeFactory() { +// return queryRangeFactory; +// } +// +// public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) { +// this.queryRangeFactory = queryRangeFactory; +// } +//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java new file mode 100644 index 0000000..d3b651f --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java @@ -0,0 +1,71 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.resolver.triple.TripleRow; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; + +/** + * Class AccumuloRdfUtils + * Date: Mar 1, 2012 + * Time: 7:15:54 PM + */ +public class AccumuloRdfUtils { + private static final Log logger = LogFactory.getLog(AccumuloRdfUtils.class); + + public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { + boolean tableExists = tableOperations.exists(tableName); + if (!tableExists) { + logger.debug("Creating accumulo table: " + tableName); + tableOperations.create(tableName); + } + } + + public static Key from(TripleRow tripleRow) { + return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), + defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); + } + + public static Value extractValue(TripleRow tripleRow) { + return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); + } + + private static byte[] defaultTo(byte[] bytes, byte[] def) { + return bytes != null ? bytes : def; + } + + private static Long defaultTo(Long l, Long def) { + return l != null ? l : def; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java new file mode 100644 index 0000000..50744dd --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java @@ -0,0 +1,522 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY; +import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME; +import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; +import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; +import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; +import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; +import info.aduna.iteration.CloseableIteration; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; +import mvm.rya.accumulo.experimental.AccumuloIndexer; +import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Namespace; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +/** + * Class AccumuloRyaDAO + * Date: Feb 29, 2012 + * Time: 12:37:22 PM + */ +public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { + private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); + + private boolean initialized = false; + private Connector connector; + private BatchWriterConfig batchWriterConfig; + + private MultiTableBatchWriter mt_bw; + + // Do not flush these individually + private BatchWriter bw_spo; + private BatchWriter bw_po; + private BatchWriter bw_osp; + + private BatchWriter bw_ns; + + private List<AccumuloIndexer> secondaryIndexers; + + private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + private RyaTableMutationsFactory ryaTableMutationsFactory; + private TableLayoutStrategy tableLayoutStrategy; + private AccumuloRyaQueryEngine queryEngine; + private RyaTripleContext ryaContext; + + @Override + public boolean isInitialized() throws RyaDAOException { + return initialized; + } + + @Override + public void init() throws RyaDAOException { + if (initialized) + return; + try { + checkNotNull(conf); + checkNotNull(connector); + + if(batchWriterConfig == null){ + batchWriterConfig = new BatchWriterConfig(); + batchWriterConfig.setMaxMemory(MAX_MEMORY); + batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS); + batchWriterConfig.setMaxWriteThreads(NUM_THREADS); + } + + tableLayoutStrategy = conf.getTableLayoutStrategy(); + ryaContext = RyaTripleContext.getInstance(conf); + ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext); + + secondaryIndexers = conf.getAdditionalIndexers(); + + TableOperations tableOperations = connector.tableOperations(); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); + AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); + + for (AccumuloIndexer index : secondaryIndexers) { + index.setConf(conf); + } + + mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig); + + //get the batch writers for tables + bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo()); + bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo()); + bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); + + bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, + MAX_TIME, 1); + + for (AccumuloIndexer index : secondaryIndexers) { + index.setMultiTableBatchWriter(mt_bw); + } + + queryEngine = new AccumuloRyaQueryEngine(connector, conf); + + checkVersion(); + + initialized = true; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + public String getVersion() throws RyaDAOException { + String version = null; + CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); + if (versIter.hasNext()) { + version = versIter.next().getObject().getData(); + } + versIter.close(); + + return version; + } + + @Override + public void add(RyaStatement statement) throws RyaDAOException { + commit(Iterators.singletonIterator(statement)); + } + + @Override + public void add(Iterator<RyaStatement> iter) throws RyaDAOException { + commit(iter); + } + + @Override + public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { + this.delete(Iterators.singletonIterator(stmt), aconf); + //TODO currently all indexers do not support delete + } + + @Override + public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException { + try { + while (statements.hasNext()) { + RyaStatement stmt = statements.next(); + //query first + CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); + while (query.hasNext()) { + deleteSingleRyaStatement(query.next()); + } + } + mt_bw.flush(); + //TODO currently all indexers do not support delete + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { + BatchDeleter bd_spo = null; + BatchDeleter bd_po = null; + BatchDeleter bd_osp = null; + + try { + bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations()); + bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations()); + bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations()); + + bd_spo.setRanges(Collections.singleton(new Range())); + bd_po.setRanges(Collections.singleton(new Range())); + bd_osp.setRanges(Collections.singleton(new Range())); + + for (RyaURI graph : graphs){ + bd_spo.fetchColumnFamily(new Text(graph.getData())); + bd_po.fetchColumnFamily(new Text(graph.getData())); + bd_osp.fetchColumnFamily(new Text(graph.getData())); + } + + bd_spo.delete(); + bd_po.delete(); + bd_osp.delete(); + + //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown +// for (AccumuloIndex index : secondaryIndexers) { +// index.dropGraph(graphs); +// } + + } catch (Exception e) { + throw new RyaDAOException(e); + } finally { + if (bd_spo != null) bd_spo.close(); + if (bd_po != null) bd_po.close(); + if (bd_osp != null) bd_osp.close(); + } + + } + + protected void deleteSingleRyaStatement(RyaStatement stmt) throws TripleRowResolverException, MutationsRejectedException { + Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt); + bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO))); + bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO))); + bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP))); + + } + + protected Mutation deleteMutation(TripleRow tripleRow) { + Mutation m = new Mutation(new Text(tripleRow.getRow())); + + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + + m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), + tripleRow.getTimestamp()); + return m; + } + + protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { + try { + //TODO: Should have a lock here in case we are adding and committing at the same time + while (commitStatements.hasNext()) { + RyaStatement stmt = commitStatements.next(); + + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); + Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + bw_spo.addMutations(spo); + bw_po.addMutations(po); + bw_osp.addMutations(osp); + + for (AccumuloIndexer index : secondaryIndexers) { + index.storeStatement(stmt); + } + } + + mt_bw.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void destroy() throws RyaDAOException { + if (!initialized) { + return; + } + //TODO: write lock + try { + initialized = false; + mt_bw.flush(); + bw_ns.flush(); + + mt_bw.close(); + bw_ns.close(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void addNamespace(String pfx, String namespace) throws RyaDAOException { + try { + Mutation m = new Mutation(new Text(pfx)); + m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); + bw_ns.addMutation(m); + bw_ns.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public String getNamespace(String pfx) throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); + scanner.setRange(new Range(new Text(pfx))); + Iterator<Map.Entry<Key, Value>> iterator = scanner + .iterator(); + + if (iterator.hasNext()) { + return new String(iterator.next().getValue().get()); + } + } catch (Exception e) { + throw new RyaDAOException(e); + } + return null; + } + + @Override + public void removeNamespace(String pfx) throws RyaDAOException { + try { + Mutation del = new Mutation(new Text(pfx)); + del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); + bw_ns.addMutation(del); + bw_ns.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); + Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); + return new AccumuloNamespaceTableIterator(result); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() { + return this; + } + + @Override + public void purge(RdfCloudTripleStoreConfiguration configuration) { + for (String tableName : getTables()) { + try { + purge(tableName, configuration.getAuths()); + compact(tableName); + } catch (TableNotFoundException e) { + logger.error(e.getMessage()); + } catch (MutationsRejectedException e) { + logger.error(e.getMessage()); + } + } + } + + @Override + public void dropAndDestroy() throws RyaDAOException { + for (String tableName : getTables()) { + try { + drop(tableName); + } catch (AccumuloSecurityException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (AccumuloException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (TableNotFoundException e) { + logger.warn(e.getMessage()); + } + } + destroy(); + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + public BatchWriterConfig getBatchWriterConfig(){ + return batchWriterConfig; + } + + public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + this.batchWriterConfig = batchWriterConfig; + } + + protected MultiTableBatchWriter getMultiTableBatchWriter(){ + return mt_bw; + } + + public AccumuloRdfConfiguration getConf() { + return conf; + } + + public void setConf(AccumuloRdfConfiguration conf) { + this.conf = conf; + } + + public RyaTableMutationsFactory getRyaTableMutationsFactory() { + return ryaTableMutationsFactory; + } + + public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { + this.ryaTableMutationsFactory = ryaTableMutationsFactory; + } + + public AccumuloRyaQueryEngine getQueryEngine() { + return queryEngine; + } + + public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) { + this.queryEngine = queryEngine; + } + + protected String[] getTables() { + // core tables + List<String> tableNames = Lists.newArrayList( + tableLayoutStrategy.getSpo(), + tableLayoutStrategy.getPo(), + tableLayoutStrategy.getOsp(), + tableLayoutStrategy.getNs(), + tableLayoutStrategy.getEval()); + + // Additional Tables + for (AccumuloIndexer index : secondaryIndexers) { + tableNames.add(index.getTableName()); + } + + return tableNames.toArray(new String[]{}); + } + + private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { + if (tableExists(tableName)) { + logger.info("Purging accumulo table: " + tableName); + BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); + try { + batchDeleter.setRanges(Collections.singleton(new Range())); + batchDeleter.delete(); + } finally { + batchDeleter.close(); + } + } + } + + private void compact(String tableName) { + logger.info("Requesting major compaction for table " + tableName); + try { + connector.tableOperations().compact(tableName, null, null, true, false); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + private boolean tableExists(String tableName) { + return getConnector().tableOperations().exists(tableName); + } + + private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { + return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); + } + + private void checkVersion() throws RyaDAOException { + String version = getVersion(); + if (version == null) { + this.add(getVersionRyaStatement()); + } + //TODO: Do a version check here + } + + protected RyaStatement getVersionRyaStatement() { + return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); + } + + private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + logger.info("Dropping cloudbase table: " + tableName); + connector.tableOperations().delete(tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java new file mode 100644 index 0000000..3e51ab5 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java @@ -0,0 +1,152 @@ +//package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +// +//import com.google.common.io.ByteArrayDataOutput; +//import com.google.common.io.ByteStreams; +//import mvm.rya.api.RdfCloudTripleStoreUtils; +//import mvm.rya.api.domain.RangeValue; +//import org.apache.accumulo.core.data.Range; +//import org.apache.hadoop.io.Text; +//import org.openrdf.model.Value; +//import org.openrdf.model.ValueFactory; +//import org.openrdf.model.impl.ValueFactoryImpl; +// +//import java.io.IOException; +//import java.util.Map; +// +//import static mvm.rya.api.RdfCloudTripleStoreConstants.*; +//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry; +// +///** +// * Class DefineTripleQueryRangeFactory +// * Date: Jun 2, 2011 +// * Time: 10:35:43 AM +// */ +//public class DefineTripleQueryRangeFactory { +// +// ValueFactory vf = ValueFactoryImpl.getInstance(); +// +// protected void fillRange(ByteArrayDataOutput startRowOut, ByteArrayDataOutput endRowOut, Value val, boolean empty) +// throws IOException { +// if(!empty) { +// startRowOut.write(DELIM_BYTES); +// endRowOut.write(DELIM_BYTES); +// } +// //null check? +// if(val instanceof RangeValue) { +// RangeValue rangeValue = (RangeValue) val; +// Value start = rangeValue.getStart(); +// Value end = rangeValue.getEnd(); +// byte[] start_val_bytes = RdfCloudTripleStoreUtils.writeValue(start); +// byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end); +// startRowOut.write(start_val_bytes); +// endRowOut.write(end_val_bytes); +// } else { +// byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val); +// startRowOut.write(val_bytes); +// endRowOut.write(val_bytes); +// } +// } +// +// public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value predicate, Value object, AccumuloRdfConfiguration conf) +// throws IOException { +// +// byte[] startrow, stoprow; +// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); +// ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput(); +// Range range; +// TABLE_LAYOUT tableLayout; +// +// if (subject != null) { +// /** +// * Case: s +// * Table: spo +// * Want this to be the first if statement since it will be most likely the most asked for table +// */ +// tableLayout = TABLE_LAYOUT.SPO; +// fillRange(startRowOut, stopRowOut, subject, true); +// if (predicate != null) { +// /** +// * Case: sp +// * Table: spo +// */ +// fillRange(startRowOut, stopRowOut, predicate, false); +// if (object != null) { +// /** +// * Case: spo +// * Table: spo +// */ +// fillRange(startRowOut, stopRowOut, object, false); +// } +// } else if (object != null) { +// /** +// * Case: so +// * Table: osp +// * Very rare case. Could have put this in the OSP if clause, but I wanted to reorder the if statement +// * for best performance. The SPO table probably gets the most scans, so I want it to be the first if +// * statement in the branch. +// */ +// tableLayout = TABLE_LAYOUT.OSP; +// startRowOut = ByteStreams.newDataOutput(); +// stopRowOut = ByteStreams.newDataOutput(); +// fillRange(startRowOut, stopRowOut, object, true); +// fillRange(startRowOut, stopRowOut, subject, false); +// } +// } else if (predicate != null) { +// /** +// * Case: p +// * Table: po +// * Wanted this to be the second if statement, since it will be the second most asked for table +// */ +// tableLayout = TABLE_LAYOUT.PO; +// fillRange(startRowOut, stopRowOut, predicate, true); +// if (object != null) { +// /** +// * Case: po +// * Table: po +// */ +// fillRange(startRowOut, stopRowOut, object, false); +// } +// } else if (object != null) { +// /** +// * Case: o +// * Table: osp +// * Probably a pretty rare scenario +// */ +// tableLayout = TABLE_LAYOUT.OSP; +// fillRange(startRowOut, stopRowOut, object, true); +// } else { +// tableLayout = TABLE_LAYOUT.SPO; +// stopRowOut.write(Byte.MAX_VALUE); +// } +// +// startrow = startRowOut.toByteArray(); +// stopRowOut.write(DELIM_STOP_BYTES); +// stoprow = stopRowOut.toByteArray(); +// Text startRowTxt = new Text(startrow); +// Text stopRowTxt = new Text(stoprow); +// range = new Range(startRowTxt, stopRowTxt); +// +// return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range); +// } +// +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java new file mode 100644 index 0000000..ee217be --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java @@ -0,0 +1,114 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; + +import java.io.IOException; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class RyaTableKeyValues { + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); + public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); + + RyaTripleContext instance; + + private RyaStatement stmt; + private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); + + public RyaTableKeyValues(RyaStatement stmt, RdfCloudTripleStoreConfiguration conf) { + this.stmt = stmt; + this.instance = RyaTripleContext.getInstance(conf); + } + + public Collection<Map.Entry<Key, Value>> getSpo() { + return spo; + } + + public Collection<Map.Entry<Key, Value>> getPo() { + return po; + } + + public Collection<Map.Entry<Key, Value>> getOsp() { + return osp; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public RyaTableKeyValues invoke() throws IOException { + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */try { + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + timestamp = timestamp == null ? 0l : timestamp; + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + return this; + } + + @Override + public String toString() { + return "RyaTableKeyValues{" + + "statement=" + stmt + + ", spo=" + spo + + ", po=" + po + + ", o=" + osp + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java new file mode 100644 index 0000000..094da63 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java @@ -0,0 +1,101 @@ +package mvm.rya.accumulo; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; +import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.io.Text; + +public class RyaTableMutationsFactory { + + RyaTripleContext ryaContext; + + public RyaTableMutationsFactory(RyaTripleContext ryaContext) { + this.ryaContext = ryaContext; + } + + //TODO: Does this still need to be collections + public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( + RyaStatement stmt) throws IOException { + + Collection<Mutation> spo_muts = new ArrayList<Mutation>(); + Collection<Mutation> po_muts = new ArrayList<Mutation>(); + Collection<Mutation> osp_muts = new ArrayList<Mutation>(); + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */ + try { + Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); + spo_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.PO); + po_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.OSP); + osp_muts.add(createMutation(tripleRow)); + } catch (TripleRowResolverException fe) { + throw new IOException(fe); + } + + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = + new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); + + return mutations; + } + + protected Mutation createMutation(TripleRow tripleRow) { + Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + mutation.put(cfText, cqText, cv, timestamp, v); + return mutation; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java new file mode 100644 index 0000000..2244f66 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java @@ -0,0 +1,39 @@ +package mvm.rya.accumulo.experimental; + +import java.io.IOException; +import java.util.Collection; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; + +import org.apache.accumulo.core.client.MultiTableBatchWriter; + +public abstract class AbstractAccumuloIndexer implements AccumuloIndexer { + + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + } + + @Override + public void storeStatements(Collection<RyaStatement> statements) throws IOException { + for (RyaStatement s : statements) { + storeStatement(s); + } + } + + @Override + public void deleteStatement(RyaStatement stmt) throws IOException { + } + + @Override + public void dropGraph(RyaURI... graphs) { + } + + @Override + public void flush() throws IOException { + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java new file mode 100644 index 0000000..1bd75ba --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java @@ -0,0 +1,13 @@ +package mvm.rya.accumulo.experimental; + +import java.io.IOException; + +import mvm.rya.api.persist.index.RyaSecondaryIndexer; + +import org.apache.accumulo.core.client.MultiTableBatchWriter; + +public interface AccumuloIndexer extends RyaSecondaryIndexer { + + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java new file mode 100644 index 0000000..a148e6b --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java @@ -0,0 +1,163 @@ +package mvm.rya.accumulo.mr; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.iterators.user.AgeOffFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; + +/** + */ +public abstract class AbstractAccumuloMRTool { + + protected Configuration conf; + protected RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; + protected String userName = "root"; + protected String pwd = "root"; + protected String instance = "instance"; + protected String zk = "zoo"; + protected Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + protected String ttl = null; + protected boolean mock = false; + protected boolean hdfsInput = false; + protected String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + + protected void init() { + zk = conf.get(MRUtils.AC_ZK_PROP, zk); + ttl = conf.get(MRUtils.AC_TTL_PROP, ttl); + instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); + userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); + pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); + mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, mock); + hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, hdfsInput); + tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); + if (tablePrefix != null) + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( + conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); + String auth = conf.get(MRUtils.AC_AUTH_PROP); + if (auth != null) + authorizations = new Authorizations(auth.split(",")); + + if (!mock) { + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + } + + //set ttl + ttl = conf.get(MRUtils.AC_TTL_PROP); + } + + protected void setupInputFormat(Job job) throws AccumuloSecurityException { + // set up accumulo input + if (!hdfsInput) { + job.setInputFormatClass(AccumuloInputFormat.class); + } else { + job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); + } + AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + if (!mock) { + AccumuloInputFormat.setZooKeeperInstance(job, instance, zk); + } else { + AccumuloInputFormat.setMockInstance(job, instance); + } + if (ttl != null) { + IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); + AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); + AccumuloInputFormat.addIterator(job, setting); + } + } + + protected void setupOutputFormat(Job job, String outputTable) throws AccumuloSecurityException { + AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + AccumuloOutputFormat.setCreateTables(job, true); + AccumuloOutputFormat.setDefaultTableName(job, outputTable); + if (!mock) { + AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk); + } else { + AccumuloOutputFormat.setMockInstance(job, instance); + } + job.setOutputFormatClass(AccumuloOutputFormat.class); + } + + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + public Configuration getConf() { + return conf; + } + + public String getInstance() { + return instance; + } + + public void setInstance(String instance) { + this.instance = instance; + } + + public String getPwd() { + return pwd; + } + + public void setPwd(String pwd) { + this.pwd = pwd; + } + + public String getZk() { + return zk; + } + + public void setZk(String zk) { + this.zk = zk; + } + + public String getTtl() { + return ttl; + } + + public void setTtl(String ttl) { + this.ttl = ttl; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java new file mode 100644 index 0000000..0818b5a --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java @@ -0,0 +1,257 @@ +package mvm.rya.accumulo.mr.eval; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Date; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; + +/** + * Count subject, predicate, object. Save in table + * Class RdfCloudTripleStoreCountTool + * Date: Apr 12, 2011 + * Time: 10:39:40 AM + * @deprecated + */ +public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool { + + public static void main(String[] args) { + try { + + ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * cloudbase props + */ + + @Override + public int run(String[] strings) throws Exception { + conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); + + //initialize + init(); + + Job job = new Job(conf); + job.setJarByClass(AccumuloRdfCountTool.class); + setupInputFormat(job); + + AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + // set mapper and reducer classes + job.setMapperClass(CountPiecesMapper.class); + job.setCombinerClass(CountPiecesCombiner.class); + job.setReducerClass(CountPiecesReducer.class); + + String outputTable = tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX; + setupOutputFormat(job, outputTable); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return 0; + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> { + + public static final byte[] EMPTY_BYTES = new byte[0]; + private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; + + ValueFactoryImpl vf = new ValueFactoryImpl(); + + private Text keyOut = new Text(); + private LongWritable valOut = new LongWritable(1); + private RyaTripleContext ryaContext; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( + conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); + ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + try { + RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); + //count each piece subject, pred, object + + String subj = statement.getSubject().getData(); + String pred = statement.getPredicate().getData(); +// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); + RyaURI scontext = statement.getContext(); + boolean includesContext = scontext != null; + String scontext_str = (includesContext) ? scontext.getData() : null; + + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + output.writeUTF(subj); + output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + + output = ByteStreams.newDataOutput(); + output.writeUTF(pred); + output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + } + } + + public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { + + private LongWritable valOut = new LongWritable(); + + // TODO: can still add up to be large I guess + // any count lower than this does not need to be saved + public static final int TOO_LOW = 2; + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + valOut.set(count); + context.write(key, valOut); + } + + } + + public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> { + + Text row = new Text(); + Text cat_txt = new Text(); + Value v_out = new Value(); + ValueFactory vf = new ValueFactoryImpl(); + + // any count lower than this does not need to be saved + public static final int TOO_LOW = 10; + private String tablePrefix; + protected Text table; + private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP); + if (cv_s != null) + cv = new ColumnVisibility(cv_s); + } + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); + String v = badi.readUTF(); + cat_txt.set(badi.readUTF()); + + Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; + boolean includesContext = badi.readBoolean(); + if (includesContext) { + columnQualifier = new Text(badi.readUTF()); + } + + row.set(v); + Mutation m = new Mutation(row); + v_out.set((count + "").getBytes()); + m.put(cat_txt, columnQualifier, cv, v_out); + context.write(table, m); + } + + } +} \ No newline at end of file
