http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java deleted file mode 100644 index a3e0677..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java +++ /dev/null @@ -1,173 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreStatement; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RdfDAOException; -import mvm.rya.api.persist.RdfEvalStatsDAO; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Resource; -import org.openrdf.model.Value; - -/** - * Class CloudbaseRdfEvalStatsDAO - * Date: Feb 28, 2012 - * Time: 5:03:16 PM - */ -public class AccumuloRdfEvalStatsDAO implements RdfEvalStatsDAO<AccumuloRdfConfiguration> { - - private boolean initialized = false; - private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); - private Connector connector; - - // private String evalTable = TBL_EVAL; - private TableLayoutStrategy tableLayoutStrategy; - - @Override - public void init() throws RdfDAOException { - try { - if (isInitialized()) { - throw new IllegalStateException("Already initialized"); - } - checkNotNull(connector); - tableLayoutStrategy = conf.getTableLayoutStrategy(); -// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); -// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); - - TableOperations tos = connector.tableOperations(); - AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); -// boolean tableExists = tos.exists(evalTable); -// if (!tableExists) -// tos.create(evalTable); - initialized = true; - } catch (Exception e) { - throw new RdfDAOException(e); - } - } - - - @Override - public void destroy() throws RdfDAOException { - if (!isInitialized()) { - throw new IllegalStateException("Not initialized"); - } - initialized = false; - } - - @Override - public boolean isInitialized() throws RdfDAOException { - return initialized; - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - public AccumuloRdfConfiguration getConf() { - return conf; - } - - public void setConf(AccumuloRdfConfiguration conf) { - this.conf = conf; - } - - @Override - public double getCardinality(AccumuloRdfConfiguration conf, - mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, - List<Value> val, Resource context) throws RdfDAOException { - try { - Authorizations authorizations = conf.getAuthorizations(); - Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); - Text cfTxt = null; - if (CARDINALITY_OF.SUBJECT.equals(card)) { - cfTxt = SUBJECT_CF_TXT; - } else if (CARDINALITY_OF.PREDICATE.equals(card)) { - cfTxt = PRED_CF_TXT; - } else if (CARDINALITY_OF.OBJECT.equals(card)) { -// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality - return Double.MAX_VALUE; - } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) { - cfTxt = SUBJECTOBJECT_CF_TXT; - } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) { - cfTxt = SUBJECTPRED_CF_TXT; - } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) { - cfTxt = PREDOBJECT_CF_TXT; - } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); - Text cq = EMPTY_TEXT; - if (context != null) { - cq = new Text(context.stringValue().getBytes()); - } - scanner.fetchColumn(cfTxt, cq); - Iterator<Value> vals = val.iterator(); - String compositeIndex = vals.next().stringValue(); - while (vals.hasNext()){ - compositeIndex += DELIM + vals.next().stringValue(); - } - scanner.setRange(new Range(new Text(compositeIndex.getBytes()))); - Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter = scanner.iterator(); - if (iter.hasNext()) { - return Double.parseDouble(new String(iter.next().getValue().get())); - } - } catch (Exception e) { - throw new RdfDAOException(e); - } - - //default - return -1; - } - - @Override - public double getCardinality(AccumuloRdfConfiguration conf, - mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, - List<Value> val) throws RdfDAOException { - return getCardinality(conf, card, val, null); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java deleted file mode 100644 index d13f50e..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//package mvm.rya.accumulo; - -// -//import com.google.common.collect.Iterators; -//import com.google.common.io.ByteArrayDataInput; -//import com.google.common.io.ByteStreams; -//import info.aduna.iteration.CloseableIteration; -//import mvm.rya.api.RdfCloudTripleStoreConstants; -//import mvm.rya.api.RdfCloudTripleStoreUtils; -//import mvm.rya.api.persist.RdfDAOException; -//import mvm.rya.api.utils.NullableStatementImpl; -//import org.apache.accumulo.core.client.*; -//import org.apache.accumulo.core.data.Key; -//import org.apache.accumulo.core.data.Range; -//import org.apache.accumulo.core.iterators.user.AgeOffFilter; -//import org.apache.accumulo.core.iterators.user.TimestampFilter; -//import org.apache.accumulo.core.security.Authorizations; -//import org.apache.hadoop.io.Text; -//import org.openrdf.model.Resource; -//import org.openrdf.model.Statement; -//import org.openrdf.model.URI; -//import org.openrdf.model.Value; -//import org.openrdf.query.BindingSet; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.IOException; -//import java.util.Collection; -//import java.util.Collections; -//import java.util.HashSet; -//import java.util.Iterator; -//import java.util.Map.Entry; -// -//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; -//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue; -// -//public class AccumuloRdfQueryIterator implements -// CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> { -// -// protected final Logger logger = LoggerFactory.getLogger(getClass()); -// -// private boolean open = false; -// private Iterator result; -// private Resource[] contexts; -// private Collection<Entry<Statement, BindingSet>> statements; -// private int numOfThreads = 20; -// -// private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); -// private ScannerBase scanner; -// private boolean isBatchScanner = true; -// private Statement statement; -// Iterator<BindingSet> iter_bss = null; -// -// private boolean hasNext = true; -// private AccumuloRdfConfiguration conf; -// private TABLE_LAYOUT tableLayout; -// private Text context_txt; -// -// private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory(); -// -// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts) -// throws RdfDAOException { -// this(statements, connector, null, contexts); -// } -// -// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, -// AccumuloRdfConfiguration conf, Resource... contexts) -// throws RdfDAOException { -// this.statements = statements; -// this.contexts = contexts; -// this.conf = conf; -// initialize(connector); -// open = true; -// } -// -// public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value object, Connector connector, -// AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException { -// this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>( -// new NullableStatementImpl(subject, predicate, object, contexts), -// null)), connector, conf, contexts); -// } -// -// protected void initialize(Connector connector) -// throws RdfDAOException { -// try { -// //TODO: We cannot span multiple tables here -// Collection<Range> ranges = new HashSet<Range>(); -// -// result = Iterators.emptyIterator(); -// Long startTime = conf.getStartTime(); -// Long ttl = conf.getTtl(); -// -// Resource context = null; -// for (Entry<Statement, BindingSet> stmtbs : statements) { -// Statement stmt = stmtbs.getKey(); -// Resource subject = stmt.getSubject(); -// URI predicate = stmt.getPredicate(); -// Value object = stmt.getObject(); -// context = stmt.getContext(); //TODO: assumes the same context for all statements -// logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination"); -// -// Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf); -// tableLayout = entry.getKey(); -//// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange(); -// Range range = entry.getValue(); -// ranges.add(range); -// rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue())); -// } -// -// Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; -// String auth = conf.getAuth(); -// if (auth != null) { -// authorizations = new Authorizations(auth.split(",")); -// } -// String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf); -// result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges); -//// if (isBatchScanner) { -//// ((BatchScanner) scanner).setRanges(ranges); -//// } else { -//// for (Range range : ranges) { -//// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this -//// } -//// } -//// -//// if (isBatchScanner) { -//// result = ((BatchScanner) scanner).iterator(); -//// } else { -//// result = ((Scanner) scanner).iterator(); -//// } -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException { -//// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta) -// if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable -// BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads); -// scannerBase.setRanges(ranges); -// populateScanner(context, startTime, ttl, scannerBase); -// return scannerBase.iterator(); -// } else { -// isBatchScanner = false; -// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()]; -// int i = 0; -// for (Range range : ranges) { -// Scanner scannerBase = connector.createScanner(table, authorizations); -// populateScanner(context, startTime, ttl, scannerBase); -// scannerBase.setRange(range); -// iters[i] = scannerBase.iterator(); -// i++; -// scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed -// } -// return Iterators.concat(iters); -// } -// -// } -// -// protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException { -// if (context != null) { //default graph -// context_txt = new Text(writeValue(context)); -// scannerBase.fetchColumnFamily(context_txt); -// } -// -//// if (!isQueryTimeBased(conf)) { -// if (startTime != null && ttl != null) { -//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); -//// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); -//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl); -//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime); -// IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName()); -// TimestampFilter.setStart(setting, startTime, true); -// TimestampFilter.setEnd(setting, startTime + ttl, true); -// scannerBase.addScanIterator(setting); -// } else if (ttl != null) { -//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); -//// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName()); -//// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl); -// IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); -// AgeOffFilter.setTTL(setting, ttl); -// scannerBase.addScanIterator(setting); -// } -//// } -// } -// -// @Override -// public void close() throws RdfDAOException { -// if (!open) -// return; -// verifyIsOpen(); -// open = false; -// if (scanner != null && isBatchScanner) { -// ((BatchScanner) scanner).close(); -// } -// } -// -// public void verifyIsOpen() throws RdfDAOException { -// if (!open) { -// throw new RdfDAOException("Iterator not open"); -// } -// } -// -// @Override -// public boolean hasNext() throws RdfDAOException { -// try { -// if (!open) -// return false; -// verifyIsOpen(); -// /** -// * For some reason, the result.hasNext returns false -// * once at the end of the iterator, and then true -// * for every subsequent call. -// */ -// hasNext = (hasNext && result.hasNext()); -// return hasNext || ((iter_bss != null) && iter_bss.hasNext()); -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// @Override -// public Entry<Statement, BindingSet> next() throws RdfDAOException { -// try { -// if (!this.hasNext()) -// return null; -// -// return getStatement(result, contexts); -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// public Entry<Statement, BindingSet> getStatement( -// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults, -// Resource... filterContexts) throws IOException { -// try { -// while (true) { -// if (iter_bss != null && iter_bss.hasNext()) { -// return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next()); -// } -// -// if (rowResults.hasNext()) { -// Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next(); -// Key key = entry.getKey(); -// ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); -// statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY); -// iter_bss = rangeMap.containsKey(key).iterator(); -// } else -// break; -// } -// } catch (Exception e) { -// throw new IOException(e); -// } -// return null; -// } -// -// @Override -// public void remove() throws RdfDAOException { -// next(); -// } -// -// public int getNumOfThreads() { -// return numOfThreads; -// } -// -// public void setNumOfThreads(int numOfThreads) { -// this.numOfThreads = numOfThreads; -// } -// -// public DefineTripleQueryRangeFactory getQueryRangeFactory() { -// return queryRangeFactory; -// } -// -// public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) { -// this.queryRangeFactory = queryRangeFactory; -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java deleted file mode 100644 index 157fc5a..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.resolver.triple.TripleRow; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; - -/** - * Class AccumuloRdfUtils - * Date: Mar 1, 2012 - * Time: 7:15:54 PM - */ -public class AccumuloRdfUtils { - private static final Log logger = LogFactory.getLog(AccumuloRdfUtils.class); - - public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { - boolean tableExists = tableOperations.exists(tableName); - if (!tableExists) { - logger.debug("Creating accumulo table: " + tableName); - tableOperations.create(tableName); - } - } - - public static Key from(TripleRow tripleRow) { - return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), - defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); - } - - public static Value extractValue(TripleRow tripleRow) { - return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); - } - - private static byte[] defaultTo(byte[] bytes, byte[] def) { - return bytes != null ? bytes : def; - } - - private static Long defaultTo(Long l, Long def) { - return l != null ? l : def; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java deleted file mode 100644 index 764ca80..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java +++ /dev/null @@ -1,523 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY; -import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME; -import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; -import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; -import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; -import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.RyaNamespaceManager; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchDeleter; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Namespace; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - -/** - * Class AccumuloRyaDAO - * Date: Feb 29, 2012 - * Time: 12:37:22 PM - */ -public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { - private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); - - private boolean initialized = false; - private Connector connector; - private BatchWriterConfig batchWriterConfig; - - private MultiTableBatchWriter mt_bw; - - // Do not flush these individually - private BatchWriter bw_spo; - private BatchWriter bw_po; - private BatchWriter bw_osp; - - private BatchWriter bw_ns; - - private List<AccumuloIndexer> secondaryIndexers; - - private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - private RyaTableMutationsFactory ryaTableMutationsFactory; - private TableLayoutStrategy tableLayoutStrategy; - private AccumuloRyaQueryEngine queryEngine; - private RyaTripleContext ryaContext; - - @Override - public boolean isInitialized() throws RyaDAOException { - return initialized; - } - - @Override - public void init() throws RyaDAOException { - if (initialized) - return; - try { - checkNotNull(conf); - checkNotNull(connector); - - if(batchWriterConfig == null){ - batchWriterConfig = new BatchWriterConfig(); - batchWriterConfig.setMaxMemory(MAX_MEMORY); - batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS); - batchWriterConfig.setMaxWriteThreads(NUM_THREADS); - } - - tableLayoutStrategy = conf.getTableLayoutStrategy(); - ryaContext = RyaTripleContext.getInstance(conf); - ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext); - - secondaryIndexers = conf.getAdditionalIndexers(); - - TableOperations tableOperations = connector.tableOperations(); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); - - for (AccumuloIndexer index : secondaryIndexers) { - index.setConf(conf); - } - - mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig); - - //get the batch writers for tables - bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo()); - bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo()); - bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); - - bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, - MAX_TIME, 1); - - for (AccumuloIndexer index : secondaryIndexers) { - index.setMultiTableBatchWriter(mt_bw); - } - - queryEngine = new AccumuloRyaQueryEngine(connector, conf); - - checkVersion(); - - initialized = true; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - public String getVersion() throws RyaDAOException { - String version = null; - CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); - if (versIter.hasNext()) { - version = versIter.next().getObject().getData(); - } - versIter.close(); - - return version; - } - - @Override - public void add(RyaStatement statement) throws RyaDAOException { - commit(Iterators.singletonIterator(statement)); - } - - @Override - public void add(Iterator<RyaStatement> iter) throws RyaDAOException { - commit(iter); - } - - @Override - public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { - this.delete(Iterators.singletonIterator(stmt), aconf); - //TODO currently all indexers do not support delete - } - - @Override - public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException { - try { - while (statements.hasNext()) { - RyaStatement stmt = statements.next(); - //query first - CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); - while (query.hasNext()) { - deleteSingleRyaStatement(query.next()); - } - } - mt_bw.flush(); - //TODO currently all indexers do not support delete - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { - BatchDeleter bd_spo = null; - BatchDeleter bd_po = null; - BatchDeleter bd_osp = null; - - try { - bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations()); - bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations()); - bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations()); - - bd_spo.setRanges(Collections.singleton(new Range())); - bd_po.setRanges(Collections.singleton(new Range())); - bd_osp.setRanges(Collections.singleton(new Range())); - - for (RyaURI graph : graphs){ - bd_spo.fetchColumnFamily(new Text(graph.getData())); - bd_po.fetchColumnFamily(new Text(graph.getData())); - bd_osp.fetchColumnFamily(new Text(graph.getData())); - } - - bd_spo.delete(); - bd_po.delete(); - bd_osp.delete(); - - //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown -// for (AccumuloIndex index : secondaryIndexers) { -// index.dropGraph(graphs); -// } - - } catch (Exception e) { - throw new RyaDAOException(e); - } finally { - if (bd_spo != null) bd_spo.close(); - if (bd_po != null) bd_po.close(); - if (bd_osp != null) bd_osp.close(); - } - - } - - protected void deleteSingleRyaStatement(RyaStatement stmt) throws TripleRowResolverException, MutationsRejectedException { - Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt); - bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO))); - bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO))); - bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP))); - - } - - protected Mutation deleteMutation(TripleRow tripleRow) { - Mutation m = new Mutation(new Text(tripleRow.getRow())); - - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), - tripleRow.getTimestamp()); - return m; - } - - protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { - try { - //TODO: Should have a lock here in case we are adding and committing at the same time - while (commitStatements.hasNext()) { - RyaStatement stmt = commitStatements.next(); - - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); - bw_spo.addMutations(spo); - bw_po.addMutations(po); - bw_osp.addMutations(osp); - - for (AccumuloIndexer index : secondaryIndexers) { - index.storeStatement(stmt); - } - } - - mt_bw.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void destroy() throws RyaDAOException { - if (!initialized) { - return; - } - //TODO: write lock - try { - initialized = false; - mt_bw.flush(); - bw_ns.flush(); - - mt_bw.close(); - bw_ns.close(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void addNamespace(String pfx, String namespace) throws RyaDAOException { - try { - Mutation m = new Mutation(new Text(pfx)); - m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); - bw_ns.addMutation(m); - bw_ns.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public String getNamespace(String pfx) throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); - scanner.setRange(new Range(new Text(pfx))); - Iterator<Map.Entry<Key, Value>> iterator = scanner - .iterator(); - - if (iterator.hasNext()) { - return new String(iterator.next().getValue().get()); - } - } catch (Exception e) { - throw new RyaDAOException(e); - } - return null; - } - - @Override - public void removeNamespace(String pfx) throws RyaDAOException { - try { - Mutation del = new Mutation(new Text(pfx)); - del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); - bw_ns.addMutation(del); - bw_ns.flush(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); - Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); - return new AccumuloNamespaceTableIterator(result); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() { - return this; - } - - @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - for (String tableName : getTables()) { - try { - purge(tableName, configuration.getAuths()); - compact(tableName); - } catch (TableNotFoundException e) { - logger.error(e.getMessage()); - } catch (MutationsRejectedException e) { - logger.error(e.getMessage()); - } - } - } - - @Override - public void dropAndDestroy() throws RyaDAOException { - for (String tableName : getTables()) { - try { - drop(tableName); - } catch (AccumuloSecurityException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (AccumuloException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (TableNotFoundException e) { - logger.warn(e.getMessage()); - } - } - destroy(); - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - public BatchWriterConfig getBatchWriterConfig(){ - return batchWriterConfig; - } - - public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) { - this.batchWriterConfig = batchWriterConfig; - } - - protected MultiTableBatchWriter getMultiTableBatchWriter(){ - return mt_bw; - } - - public AccumuloRdfConfiguration getConf() { - return conf; - } - - public void setConf(AccumuloRdfConfiguration conf) { - this.conf = conf; - } - - public RyaTableMutationsFactory getRyaTableMutationsFactory() { - return ryaTableMutationsFactory; - } - - public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { - this.ryaTableMutationsFactory = ryaTableMutationsFactory; - } - - public AccumuloRyaQueryEngine getQueryEngine() { - return queryEngine; - } - - public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) { - this.queryEngine = queryEngine; - } - - protected String[] getTables() { - // core tables - List<String> tableNames = Lists.newArrayList( - tableLayoutStrategy.getSpo(), - tableLayoutStrategy.getPo(), - tableLayoutStrategy.getOsp(), - tableLayoutStrategy.getNs(), - tableLayoutStrategy.getEval()); - - // Additional Tables - for (AccumuloIndexer index : secondaryIndexers) { - tableNames.add(index.getTableName()); - } - - return tableNames.toArray(new String[]{}); - } - - private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { - if (tableExists(tableName)) { - logger.info("Purging accumulo table: " + tableName); - BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); - try { - batchDeleter.setRanges(Collections.singleton(new Range())); - batchDeleter.delete(); - } finally { - batchDeleter.close(); - } - } - } - - private void compact(String tableName) { - logger.info("Requesting major compaction for table " + tableName); - try { - connector.tableOperations().compact(tableName, null, null, true, false); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - private boolean tableExists(String tableName) { - return getConnector().tableOperations().exists(tableName); - } - - private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { - return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); - } - - private void checkVersion() throws RyaDAOException { - String version = getVersion(); - if (version == null) { - this.add(getVersionRyaStatement()); - } - //TODO: Do a version check here - } - - protected RyaStatement getVersionRyaStatement() { - return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); - } - - private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - logger.info("Dropping cloudbase table: " + tableName); - connector.tableOperations().delete(tableName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java deleted file mode 100644 index b5a4e84..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//package mvm.rya.accumulo; - -// -//import com.google.common.io.ByteArrayDataOutput; -//import com.google.common.io.ByteStreams; -//import mvm.rya.api.RdfCloudTripleStoreUtils; -//import mvm.rya.api.domain.RangeValue; -//import org.apache.accumulo.core.data.Range; -//import org.apache.hadoop.io.Text; -//import org.openrdf.model.Value; -//import org.openrdf.model.ValueFactory; -//import org.openrdf.model.impl.ValueFactoryImpl; -// -//import java.io.IOException; -//import java.util.Map; -// -//import static mvm.rya.api.RdfCloudTripleStoreConstants.*; -//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry; -// -///** -// * Class DefineTripleQueryRangeFactory -// * Date: Jun 2, 2011 -// * Time: 10:35:43 AM -// */ -//public class DefineTripleQueryRangeFactory { -// -// ValueFactory vf = ValueFactoryImpl.getInstance(); -// -// protected void fillRange(ByteArrayDataOutput startRowOut, ByteArrayDataOutput endRowOut, Value val, boolean empty) -// throws IOException { -// if(!empty) { -// startRowOut.write(DELIM_BYTES); -// endRowOut.write(DELIM_BYTES); -// } -// //null check? -// if(val instanceof RangeValue) { -// RangeValue rangeValue = (RangeValue) val; -// Value start = rangeValue.getStart(); -// Value end = rangeValue.getEnd(); -// byte[] start_val_bytes = RdfCloudTripleStoreUtils.writeValue(start); -// byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end); -// startRowOut.write(start_val_bytes); -// endRowOut.write(end_val_bytes); -// } else { -// byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val); -// startRowOut.write(val_bytes); -// endRowOut.write(val_bytes); -// } -// } -// -// public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value predicate, Value object, AccumuloRdfConfiguration conf) -// throws IOException { -// -// byte[] startrow, stoprow; -// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); -// ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput(); -// Range range; -// TABLE_LAYOUT tableLayout; -// -// if (subject != null) { -// /** -// * Case: s -// * Table: spo -// * Want this to be the first if statement since it will be most likely the most asked for table -// */ -// tableLayout = TABLE_LAYOUT.SPO; -// fillRange(startRowOut, stopRowOut, subject, true); -// if (predicate != null) { -// /** -// * Case: sp -// * Table: spo -// */ -// fillRange(startRowOut, stopRowOut, predicate, false); -// if (object != null) { -// /** -// * Case: spo -// * Table: spo -// */ -// fillRange(startRowOut, stopRowOut, object, false); -// } -// } else if (object != null) { -// /** -// * Case: so -// * Table: osp -// * Very rare case. Could have put this in the OSP if clause, but I wanted to reorder the if statement -// * for best performance. The SPO table probably gets the most scans, so I want it to be the first if -// * statement in the branch. -// */ -// tableLayout = TABLE_LAYOUT.OSP; -// startRowOut = ByteStreams.newDataOutput(); -// stopRowOut = ByteStreams.newDataOutput(); -// fillRange(startRowOut, stopRowOut, object, true); -// fillRange(startRowOut, stopRowOut, subject, false); -// } -// } else if (predicate != null) { -// /** -// * Case: p -// * Table: po -// * Wanted this to be the second if statement, since it will be the second most asked for table -// */ -// tableLayout = TABLE_LAYOUT.PO; -// fillRange(startRowOut, stopRowOut, predicate, true); -// if (object != null) { -// /** -// * Case: po -// * Table: po -// */ -// fillRange(startRowOut, stopRowOut, object, false); -// } -// } else if (object != null) { -// /** -// * Case: o -// * Table: osp -// * Probably a pretty rare scenario -// */ -// tableLayout = TABLE_LAYOUT.OSP; -// fillRange(startRowOut, stopRowOut, object, true); -// } else { -// tableLayout = TABLE_LAYOUT.SPO; -// stopRowOut.write(Byte.MAX_VALUE); -// } -// -// startrow = startRowOut.toByteArray(); -// stopRowOut.write(DELIM_STOP_BYTES); -// stoprow = stopRowOut.toByteArray(); -// Text startRowTxt = new Text(startrow); -// Text stopRowTxt = new Text(stoprow); -// range = new Range(startRowTxt, stopRowTxt); -// -// return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range); -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java deleted file mode 100644 index 574029e..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java +++ /dev/null @@ -1,115 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; - -import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; - -public class RyaTableKeyValues { - public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); - public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); - - RyaTripleContext instance; - - private RyaStatement stmt; - private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); - - public RyaTableKeyValues(RyaStatement stmt, RdfCloudTripleStoreConfiguration conf) { - this.stmt = stmt; - this.instance = RyaTripleContext.getInstance(conf); - } - - public Collection<Map.Entry<Key, Value>> getSpo() { - return spo; - } - - public Collection<Map.Entry<Key, Value>> getPo() { - return po; - } - - public Collection<Map.Entry<Key, Value>> getOsp() { - return osp; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public RyaTableKeyValues invoke() throws IOException { - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */try { - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - timestamp = timestamp == null ? 0l : timestamp; - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - } catch (TripleRowResolverException e) { - throw new IOException(e); - } - return this; - } - - @Override - public String toString() { - return "RyaTableKeyValues{" + - "statement=" + stmt + - ", spo=" + spo + - ", po=" + po + - ", o=" + osp + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java deleted file mode 100644 index 0dbafc1..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; - -public class RyaTableMutationsFactory { - - RyaTripleContext ryaContext; - - public RyaTableMutationsFactory(RyaTripleContext ryaContext) { - this.ryaContext = ryaContext; - } - - //TODO: Does this still need to be collections - public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( - RyaStatement stmt) throws IOException { - - Collection<Mutation> spo_muts = new ArrayList<Mutation>(); - Collection<Mutation> po_muts = new ArrayList<Mutation>(); - Collection<Mutation> osp_muts = new ArrayList<Mutation>(); - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */ - try { - Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); - spo_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.PO); - po_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.OSP); - osp_muts.add(createMutation(tripleRow)); - } catch (TripleRowResolverException fe) { - throw new IOException(fe); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = - new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); - - return mutations; - } - - protected Mutation createMutation(TripleRow tripleRow) { - Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - mutation.put(cfText, cqText, cv, timestamp, v); - return mutation; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java deleted file mode 100644 index 5df5da9..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java +++ /dev/null @@ -1,59 +0,0 @@ -package mvm.rya.accumulo.experimental; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; -import java.util.Collection; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; - -import org.apache.accumulo.core.client.MultiTableBatchWriter; - -public abstract class AbstractAccumuloIndexer implements AccumuloIndexer { - - @Override - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { - } - - @Override - public void storeStatements(Collection<RyaStatement> statements) throws IOException { - for (RyaStatement s : statements) { - storeStatement(s); - } - } - - @Override - public void deleteStatement(RyaStatement stmt) throws IOException { - } - - @Override - public void dropGraph(RyaURI... graphs) { - } - - @Override - public void flush() throws IOException { - } - - @Override - public void close() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java deleted file mode 100644 index 2329831..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AccumuloIndexer.java +++ /dev/null @@ -1,33 +0,0 @@ -package mvm.rya.accumulo.experimental; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; - -import mvm.rya.api.persist.index.RyaSecondaryIndexer; - -import org.apache.accumulo.core.client.MultiTableBatchWriter; - -public interface AccumuloIndexer extends RyaSecondaryIndexer { - - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java deleted file mode 100644 index 000c08a..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java +++ /dev/null @@ -1,164 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.accumulo.AccumuloRdfConstants; -import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreUtils; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.iterators.user.AgeOffFilter; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; - -/** - */ -public abstract class AbstractAccumuloMRTool { - - protected Configuration conf; - protected RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; - protected String userName = "root"; - protected String pwd = "root"; - protected String instance = "instance"; - protected String zk = "zoo"; - protected Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; - protected String ttl = null; - protected boolean mock = false; - protected boolean hdfsInput = false; - protected String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; - - protected void init() { - zk = conf.get(MRUtils.AC_ZK_PROP, zk); - ttl = conf.get(MRUtils.AC_TTL_PROP, ttl); - instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); - mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, mock); - hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, hdfsInput); - tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); - if (tablePrefix != null) - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( - conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); - String auth = conf.get(MRUtils.AC_AUTH_PROP); - if (auth != null) - authorizations = new Authorizations(auth.split(",")); - - if (!mock) { - conf.setBoolean("mapred.map.tasks.speculative.execution", false); - conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - conf.set("io.sort.mb", "256"); - } - - //set ttl - ttl = conf.get(MRUtils.AC_TTL_PROP); - } - - protected void setupInputFormat(Job job) throws AccumuloSecurityException { - // set up accumulo input - if (!hdfsInput) { - job.setInputFormatClass(AccumuloInputFormat.class); - } else { - job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); - } - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); - AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); - AccumuloInputFormat.setScanAuthorizations(job, authorizations); - if (!mock) { - AccumuloInputFormat.setZooKeeperInstance(job, instance, zk); - } else { - AccumuloInputFormat.setMockInstance(job, instance); - } - if (ttl != null) { - IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); - AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); - AccumuloInputFormat.addIterator(job, setting); - } - } - - protected void setupOutputFormat(Job job, String outputTable) throws AccumuloSecurityException { - AccumuloOutputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); - AccumuloOutputFormat.setCreateTables(job, true); - AccumuloOutputFormat.setDefaultTableName(job, outputTable); - if (!mock) { - AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk); - } else { - AccumuloOutputFormat.setMockInstance(job, instance); - } - job.setOutputFormatClass(AccumuloOutputFormat.class); - } - - public void setConf(Configuration configuration) { - this.conf = configuration; - } - - public Configuration getConf() { - return conf; - } - - public String getInstance() { - return instance; - } - - public void setInstance(String instance) { - this.instance = instance; - } - - public String getPwd() { - return pwd; - } - - public void setPwd(String pwd) { - this.pwd = pwd; - } - - public String getZk() { - return zk; - } - - public void setZk(String zk) { - this.zk = zk; - } - - public String getTtl() { - return ttl; - } - - public void setTtl(String ttl) { - this.ttl = ttl; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java deleted file mode 100644 index ee1004d..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java +++ /dev/null @@ -1,258 +0,0 @@ -package mvm.rya.accumulo.mr.eval; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.Date; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRdfConstants; -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -import com.google.common.collect.Lists; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; - -/** - * Count subject, predicate, object. Save in table - * Class RdfCloudTripleStoreCountTool - * Date: Apr 12, 2011 - * Time: 10:39:40 AM - * @deprecated - */ -public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool { - - public static void main(String[] args) { - try { - - ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args); - } catch (Exception e) { - e.printStackTrace(); - } - } - - /** - * cloudbase props - */ - - @Override - public int run(String[] strings) throws Exception { - conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); - - //initialize - init(); - - Job job = new Job(conf); - job.setJarByClass(AccumuloRdfCountTool.class); - setupInputFormat(job); - - AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); - // set input output of the particular job - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(LongWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - - // set mapper and reducer classes - job.setMapperClass(CountPiecesMapper.class); - job.setCombinerClass(CountPiecesCombiner.class); - job.setReducerClass(CountPiecesReducer.class); - - String outputTable = tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX; - setupOutputFormat(job, outputTable); - - // Submit the job - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; - - if (exitCode == 0) { - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " - + (end_time.getTime() - startTime.getTime()) / 1000 - + " seconds."); - return 0; - } else { - System.out.println("Job Failed!!!"); - } - - return -1; - } - - public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> { - - public static final byte[] EMPTY_BYTES = new byte[0]; - private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; - - ValueFactoryImpl vf = new ValueFactoryImpl(); - - private Text keyOut = new Text(); - private LongWritable valOut = new LongWritable(1); - private RyaTripleContext ryaContext; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( - conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); - ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); - } - - @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - try { - RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); - //count each piece subject, pred, object - - String subj = statement.getSubject().getData(); - String pred = statement.getPredicate().getData(); -// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); - RyaURI scontext = statement.getContext(); - boolean includesContext = scontext != null; - String scontext_str = (includesContext) ? scontext.getData() : null; - - ByteArrayDataOutput output = ByteStreams.newDataOutput(); - output.writeUTF(subj); - output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); - output.writeBoolean(includesContext); - if (includesContext) - output.writeUTF(scontext_str); - keyOut.set(output.toByteArray()); - context.write(keyOut, valOut); - - output = ByteStreams.newDataOutput(); - output.writeUTF(pred); - output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); - output.writeBoolean(includesContext); - if (includesContext) - output.writeUTF(scontext_str); - keyOut.set(output.toByteArray()); - context.write(keyOut, valOut); - } catch (TripleRowResolverException e) { - throw new IOException(e); - } - } - } - - public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { - - private LongWritable valOut = new LongWritable(); - - // TODO: can still add up to be large I guess - // any count lower than this does not need to be saved - public static final int TOO_LOW = 2; - - @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - long count = 0; - for (LongWritable lw : values) { - count += lw.get(); - } - - if (count <= TOO_LOW) - return; - - valOut.set(count); - context.write(key, valOut); - } - - } - - public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> { - - Text row = new Text(); - Text cat_txt = new Text(); - Value v_out = new Value(); - ValueFactory vf = new ValueFactoryImpl(); - - // any count lower than this does not need to be saved - public static final int TOO_LOW = 10; - private String tablePrefix; - protected Text table; - private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); - table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP); - if (cv_s != null) - cv = new ColumnVisibility(cv_s); - } - - @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { - long count = 0; - for (LongWritable lw : values) { - count += lw.get(); - } - - if (count <= TOO_LOW) - return; - - ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); - String v = badi.readUTF(); - cat_txt.set(badi.readUTF()); - - Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; - boolean includesContext = badi.readBoolean(); - if (includesContext) { - columnQualifier = new Text(badi.readUTF()); - } - - row.set(v); - Mutation m = new Mutation(row); - v_out.set((count + "").getBytes()); - m.put(cat_txt, columnQualifier, cv, v_out); - context.write(table, m); - } - - } -}
