http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java b/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java deleted file mode 100644 index c431468..0000000 --- a/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java +++ /dev/null @@ -1,51 +0,0 @@ -package mvm.rya.rdftriplestore.provenance; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static org.junit.Assert.assertTrue; - -import org.junit.Test; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.QueryLanguage; -import org.openrdf.query.TupleQuery; -import org.openrdf.query.TupleQueryResult; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.sail.Sail; -import org.openrdf.sail.memory.MemoryStore; - -public class TriplestoreProvenanceCollectorTest { - - @Test - public void testCollect() throws ProvenanceCollectionException, RepositoryException, MalformedQueryException, QueryEvaluationException { - Sail ms = new MemoryStore(); - SailRepository repo = new SailRepository(ms); - repo.initialize(); - TriplestoreProvenanceCollector coll = new TriplestoreProvenanceCollector(repo, "fakeUser", "SPARQL"); - coll.recordQuery("fakeQuery"); - String queryString = "SELECT ?x ?y WHERE { ?x ?p ?y } "; - TupleQuery tupleQuery = repo.getConnection().prepareTupleQuery(QueryLanguage.SPARQL, queryString); - TupleQueryResult result = tupleQuery.evaluate(); - // TODO not asserting on the results. - assertTrue(result.hasNext()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java ---------------------------------------------------------------------- diff --git a/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java b/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java deleted file mode 100644 index 99875e2..0000000 --- a/common/rya.provenance/src/test/java/mvm/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package mvm.rya.rdftriplestore.provenance.rdf; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static org.junit.Assert.assertTrue; - -import java.util.List; - -import org.junit.Test; -import org.openrdf.model.Statement; - -public class BaseProvenanceModelTest { - - @Test - public void testCreateTriples() { - BaseProvenanceModel model = new BaseProvenanceModel(); - List<Statement> statements = model.getStatementsForQuery("SELECT ?query where { ?query rdf:type <rya:query>. }", "fakeuser", "SPARQL"); - assertTrue(!statements.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java b/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java new file mode 100644 index 0000000..c431468 --- /dev/null +++ b/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/TriplestoreProvenanceCollectorTest.java @@ -0,0 +1,51 @@ +package mvm.rya.rdftriplestore.provenance; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.sail.Sail; +import org.openrdf.sail.memory.MemoryStore; + +public class TriplestoreProvenanceCollectorTest { + + @Test + public void testCollect() throws ProvenanceCollectionException, RepositoryException, MalformedQueryException, QueryEvaluationException { + Sail ms = new MemoryStore(); + SailRepository repo = new SailRepository(ms); + repo.initialize(); + TriplestoreProvenanceCollector coll = new TriplestoreProvenanceCollector(repo, "fakeUser", "SPARQL"); + coll.recordQuery("fakeQuery"); + String queryString = "SELECT ?x ?y WHERE { ?x ?p ?y } "; + TupleQuery tupleQuery = repo.getConnection().prepareTupleQuery(QueryLanguage.SPARQL, queryString); + TupleQueryResult result = tupleQuery.evaluate(); + // TODO not asserting on the results. + assertTrue(result.hasNext()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java ---------------------------------------------------------------------- diff --git a/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java b/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java new file mode 100644 index 0000000..99875e2 --- /dev/null +++ b/common/rya.provenance/src/test/java/org/apache/rya/rdftriplestore/provenance/rdf/BaseProvenanceModelTest.java @@ -0,0 +1,38 @@ +package mvm.rya.rdftriplestore.provenance.rdf; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.junit.Test; +import org.openrdf.model.Statement; + +public class BaseProvenanceModelTest { + + @Test + public void testCreateTriples() { + BaseProvenanceModel model = new BaseProvenanceModel(); + List<Statement> statements = model.getStatementsForQuery("SELECT ?query where { ?query rdf:type <rya:query>. }", "fakeuser", "SPARQL"); + assertTrue(!statements.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloNamespaceTableIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloNamespaceTableIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloNamespaceTableIterator.java deleted file mode 100644 index ebca6a2..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloNamespaceTableIterator.java +++ /dev/null @@ -1,99 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import com.google.common.base.Preconditions; -import info.aduna.iteration.CloseableIteration; -import mvm.rya.api.persist.RdfDAOException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.openrdf.model.Namespace; -import org.openrdf.model.impl.NamespaceImpl; - -import java.io.IOError; -import java.util.Iterator; -import java.util.Map.Entry; - -public class AccumuloNamespaceTableIterator<T extends Namespace> implements - CloseableIteration<Namespace, RdfDAOException> { - - private boolean open = false; - private Iterator<Entry<Key, Value>> result; - - public AccumuloNamespaceTableIterator(Iterator<Entry<Key, Value>> result) throws RdfDAOException { - Preconditions.checkNotNull(result); - open = true; - this.result = result; - } - - @Override - public void close() throws RdfDAOException { - try { - verifyIsOpen(); - open = false; - } catch (IOError e) { - throw new RdfDAOException(e); - } - } - - public void verifyIsOpen() throws RdfDAOException { - if (!open) { - throw new RdfDAOException("Iterator not open"); - } - } - - @Override - public boolean hasNext() throws RdfDAOException { - verifyIsOpen(); - return result != null && result.hasNext(); - } - - @Override - public Namespace next() throws RdfDAOException { - if (hasNext()) { - return getNamespace(result); - } - return null; - } - - public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) { - for (; rowResults.hasNext(); ) { - Entry<Key, Value> next = rowResults.next(); - Key key = next.getKey(); - Value val = next.getValue(); - String cf = key.getColumnFamily().toString(); - String cq = key.getColumnQualifier().toString(); - return new NamespaceImpl(key.getRow().toString(), new String( - val.get())); - } - return null; - } - - @Override - public void remove() throws RdfDAOException { - next(); - } - - public boolean isOpen() { - return open; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java deleted file mode 100644 index 709ceb9..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConfiguration.java +++ /dev/null @@ -1,158 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Lists; - -/** - * Created by IntelliJ IDEA. - * Date: 4/25/12 - * Time: 3:24 PM - * To change this template use File | Settings | File Templates. - */ -public class AccumuloRdfConfiguration extends RdfCloudTripleStoreConfiguration { - - public static final String MAXRANGES_SCANNER = "ac.query.maxranges"; - - public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; - - public static final String CONF_FLUSH_EACH_UPDATE = "ac.dao.flush"; - - public static final String ITERATOR_SETTINGS_SIZE = "ac.iterators.size"; - public static final String ITERATOR_SETTINGS_BASE = "ac.iterators.%d."; - public static final String ITERATOR_SETTINGS_NAME = ITERATOR_SETTINGS_BASE + "name"; - public static final String ITERATOR_SETTINGS_CLASS = ITERATOR_SETTINGS_BASE + "iteratorClass"; - public static final String ITERATOR_SETTINGS_PRIORITY = ITERATOR_SETTINGS_BASE + "priority"; - public static final String ITERATOR_SETTINGS_OPTIONS_SIZE = ITERATOR_SETTINGS_BASE + "optionsSize"; - public static final String ITERATOR_SETTINGS_OPTIONS_KEY = ITERATOR_SETTINGS_BASE + "option.%d.name"; - public static final String ITERATOR_SETTINGS_OPTIONS_VALUE = ITERATOR_SETTINGS_BASE + "option.%d.value"; - - public AccumuloRdfConfiguration() { - super(); - } - - public AccumuloRdfConfiguration(Configuration other) { - super(other); - } - - @Override - public AccumuloRdfConfiguration clone() { - return new AccumuloRdfConfiguration(this); - } - - public Authorizations getAuthorizations() { - String[] auths = getAuths(); - if (auths == null || auths.length == 0) - return AccumuloRdfConstants.ALL_AUTHORIZATIONS; - return new Authorizations(auths); - } - - public void setMaxRangesForScanner(Integer max) { - setInt(MAXRANGES_SCANNER, max); - } - - public Integer getMaxRangesForScanner() { - return getInt(MAXRANGES_SCANNER, 2); - } - - public void setAdditionalIndexers(Class<? extends AccumuloIndexer>... indexers) { - List<String> strs = Lists.newArrayList(); - for (Class<? extends AccumuloIndexer> ai : indexers){ - strs.add(ai.getName()); - } - - setStrings(CONF_ADDITIONAL_INDEXERS, strs.toArray(new String[]{})); - } - - public List<AccumuloIndexer> getAdditionalIndexers() { - return getInstances(CONF_ADDITIONAL_INDEXERS, AccumuloIndexer.class); - } - public boolean flushEachUpdate(){ - return getBoolean(CONF_FLUSH_EACH_UPDATE, true); - } - - public void setFlush(boolean flush){ - setBoolean(CONF_FLUSH_EACH_UPDATE, flush); - } - - public void setAdditionalIterators(IteratorSetting... additionalIterators){ - //TODO do we need to worry about cleaning up - this.set(ITERATOR_SETTINGS_SIZE, Integer.toString(additionalIterators.length)); - int i = 0; - for(IteratorSetting iterator : additionalIterators) { - this.set(String.format(ITERATOR_SETTINGS_NAME, i), iterator.getName()); - this.set(String.format(ITERATOR_SETTINGS_CLASS, i), iterator.getIteratorClass()); - this.set(String.format(ITERATOR_SETTINGS_PRIORITY, i), Integer.toString(iterator.getPriority())); - Map<String, String> options = iterator.getOptions(); - - this.set(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i), Integer.toString(options.size())); - Iterator<Entry<String, String>> it = options.entrySet().iterator(); - int j = 0; - while(it.hasNext()) { - Entry<String, String> item = it.next(); - this.set(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j), item.getKey()); - this.set(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j), item.getValue()); - j++; - } - i++; - } - } - - public IteratorSetting[] getAdditionalIterators(){ - int size = Integer.valueOf(this.get(ITERATOR_SETTINGS_SIZE, "0")); - if(size == 0) { - return new IteratorSetting[0]; - } - - IteratorSetting[] settings = new IteratorSetting[size]; - for(int i = 0; i < size; i++) { - String name = this.get(String.format(ITERATOR_SETTINGS_NAME, i)); - String iteratorClass = this.get(String.format(ITERATOR_SETTINGS_CLASS, i)); - int priority = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_PRIORITY, i))); - - int optionsSize = Integer.valueOf(this.get(String.format(ITERATOR_SETTINGS_OPTIONS_SIZE, i))); - Map<String, String> options = new HashMap<String, String>(optionsSize); - for(int j = 0; j < optionsSize; j++) { - String key = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_KEY, i, j)); - String value = this.get(String.format(ITERATOR_SETTINGS_OPTIONS_VALUE, i, j)); - options.put(key, value); - } - settings[i] = new IteratorSetting(priority, name, iteratorClass, options); - } - - return settings; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java deleted file mode 100644 index 1ec57a7..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfConstants.java +++ /dev/null @@ -1,40 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; - -/** - * Interface AccumuloRdfConstants - * Date: Mar 1, 2012 - * Time: 7:24:52 PM - */ -public interface AccumuloRdfConstants { - public static final Authorizations ALL_AUTHORIZATIONS = Constants.NO_AUTHS; - - public static final Value EMPTY_VALUE = new Value(new byte[0]); - - public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]); -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java deleted file mode 100644 index a3e0677..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfEvalStatsDAO.java +++ /dev/null @@ -1,173 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.PRED_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECT_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTPRED_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.PREDOBJECT_CF_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.SUBJECTOBJECT_CF_TXT; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreStatement; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RdfDAOException; -import mvm.rya.api.persist.RdfEvalStatsDAO; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Resource; -import org.openrdf.model.Value; - -/** - * Class CloudbaseRdfEvalStatsDAO - * Date: Feb 28, 2012 - * Time: 5:03:16 PM - */ -public class AccumuloRdfEvalStatsDAO implements RdfEvalStatsDAO<AccumuloRdfConfiguration> { - - private boolean initialized = false; - private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); - private Connector connector; - - // private String evalTable = TBL_EVAL; - private TableLayoutStrategy tableLayoutStrategy; - - @Override - public void init() throws RdfDAOException { - try { - if (isInitialized()) { - throw new IllegalStateException("Already initialized"); - } - checkNotNull(connector); - tableLayoutStrategy = conf.getTableLayoutStrategy(); -// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); -// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); - - TableOperations tos = connector.tableOperations(); - AccumuloRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); -// boolean tableExists = tos.exists(evalTable); -// if (!tableExists) -// tos.create(evalTable); - initialized = true; - } catch (Exception e) { - throw new RdfDAOException(e); - } - } - - - @Override - public void destroy() throws RdfDAOException { - if (!isInitialized()) { - throw new IllegalStateException("Not initialized"); - } - initialized = false; - } - - @Override - public boolean isInitialized() throws RdfDAOException { - return initialized; - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - public AccumuloRdfConfiguration getConf() { - return conf; - } - - public void setConf(AccumuloRdfConfiguration conf) { - this.conf = conf; - } - - @Override - public double getCardinality(AccumuloRdfConfiguration conf, - mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, - List<Value> val, Resource context) throws RdfDAOException { - try { - Authorizations authorizations = conf.getAuthorizations(); - Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); - Text cfTxt = null; - if (CARDINALITY_OF.SUBJECT.equals(card)) { - cfTxt = SUBJECT_CF_TXT; - } else if (CARDINALITY_OF.PREDICATE.equals(card)) { - cfTxt = PRED_CF_TXT; - } else if (CARDINALITY_OF.OBJECT.equals(card)) { -// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality - return Double.MAX_VALUE; - } else if (CARDINALITY_OF.SUBJECTOBJECT.equals(card)) { - cfTxt = SUBJECTOBJECT_CF_TXT; - } else if (CARDINALITY_OF.SUBJECTPREDICATE.equals(card)) { - cfTxt = SUBJECTPRED_CF_TXT; - } else if (CARDINALITY_OF.PREDICATEOBJECT.equals(card)) { - cfTxt = PREDOBJECT_CF_TXT; - } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); - Text cq = EMPTY_TEXT; - if (context != null) { - cq = new Text(context.stringValue().getBytes()); - } - scanner.fetchColumn(cfTxt, cq); - Iterator<Value> vals = val.iterator(); - String compositeIndex = vals.next().stringValue(); - while (vals.hasNext()){ - compositeIndex += DELIM + vals.next().stringValue(); - } - scanner.setRange(new Range(new Text(compositeIndex.getBytes()))); - Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iter = scanner.iterator(); - if (iter.hasNext()) { - return Double.parseDouble(new String(iter.next().getValue().get())); - } - } catch (Exception e) { - throw new RdfDAOException(e); - } - - //default - return -1; - } - - @Override - public double getCardinality(AccumuloRdfConfiguration conf, - mvm.rya.api.persist.RdfEvalStatsDAO.CARDINALITY_OF card, - List<Value> val) throws RdfDAOException { - return getCardinality(conf, card, val, null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java deleted file mode 100644 index d13f50e..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfQueryIterator.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//package mvm.rya.accumulo; - -// -//import com.google.common.collect.Iterators; -//import com.google.common.io.ByteArrayDataInput; -//import com.google.common.io.ByteStreams; -//import info.aduna.iteration.CloseableIteration; -//import mvm.rya.api.RdfCloudTripleStoreConstants; -//import mvm.rya.api.RdfCloudTripleStoreUtils; -//import mvm.rya.api.persist.RdfDAOException; -//import mvm.rya.api.utils.NullableStatementImpl; -//import org.apache.accumulo.core.client.*; -//import org.apache.accumulo.core.data.Key; -//import org.apache.accumulo.core.data.Range; -//import org.apache.accumulo.core.iterators.user.AgeOffFilter; -//import org.apache.accumulo.core.iterators.user.TimestampFilter; -//import org.apache.accumulo.core.security.Authorizations; -//import org.apache.hadoop.io.Text; -//import org.openrdf.model.Resource; -//import org.openrdf.model.Statement; -//import org.openrdf.model.URI; -//import org.openrdf.model.Value; -//import org.openrdf.query.BindingSet; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -// -//import java.io.IOException; -//import java.util.Collection; -//import java.util.Collections; -//import java.util.HashSet; -//import java.util.Iterator; -//import java.util.Map.Entry; -// -//import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; -//import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -//import static mvm.rya.api.RdfCloudTripleStoreUtils.writeValue; -// -//public class AccumuloRdfQueryIterator implements -// CloseableIteration<Entry<Statement, BindingSet>, RdfDAOException> { -// -// protected final Logger logger = LoggerFactory.getLogger(getClass()); -// -// private boolean open = false; -// private Iterator result; -// private Resource[] contexts; -// private Collection<Entry<Statement, BindingSet>> statements; -// private int numOfThreads = 20; -// -// private RangeBindingSetEntries rangeMap = new RangeBindingSetEntries(); -// private ScannerBase scanner; -// private boolean isBatchScanner = true; -// private Statement statement; -// Iterator<BindingSet> iter_bss = null; -// -// private boolean hasNext = true; -// private AccumuloRdfConfiguration conf; -// private TABLE_LAYOUT tableLayout; -// private Text context_txt; -// -// private DefineTripleQueryRangeFactory queryRangeFactory = new DefineTripleQueryRangeFactory(); -// -// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, Resource... contexts) -// throws RdfDAOException { -// this(statements, connector, null, contexts); -// } -// -// public AccumuloRdfQueryIterator(Collection<Entry<Statement, BindingSet>> statements, Connector connector, -// AccumuloRdfConfiguration conf, Resource... contexts) -// throws RdfDAOException { -// this.statements = statements; -// this.contexts = contexts; -// this.conf = conf; -// initialize(connector); -// open = true; -// } -// -// public AccumuloRdfQueryIterator(Resource subject, URI predicate, Value object, Connector connector, -// AccumuloRdfConfiguration conf, Resource[] contexts) throws RdfDAOException { -// this(Collections.<Entry<Statement, BindingSet>>singleton(new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>( -// new NullableStatementImpl(subject, predicate, object, contexts), -// null)), connector, conf, contexts); -// } -// -// protected void initialize(Connector connector) -// throws RdfDAOException { -// try { -// //TODO: We cannot span multiple tables here -// Collection<Range> ranges = new HashSet<Range>(); -// -// result = Iterators.emptyIterator(); -// Long startTime = conf.getStartTime(); -// Long ttl = conf.getTtl(); -// -// Resource context = null; -// for (Entry<Statement, BindingSet> stmtbs : statements) { -// Statement stmt = stmtbs.getKey(); -// Resource subject = stmt.getSubject(); -// URI predicate = stmt.getPredicate(); -// Value object = stmt.getObject(); -// context = stmt.getContext(); //TODO: assumes the same context for all statements -// logger.debug("Batch Scan, lookup subject[" + subject + "] predicate[" + predicate + "] object[" + object + "] combination"); -// -// Entry<TABLE_LAYOUT, Range> entry = queryRangeFactory.defineRange(subject, predicate, object, conf); -// tableLayout = entry.getKey(); -//// isTimeRange = isTimeRange || queryRangeFactory.isTimeRange(); -// Range range = entry.getValue(); -// ranges.add(range); -// rangeMap.ranges.add(new RdfCloudTripleStoreUtils.CustomEntry<Range, BindingSet>(range, stmtbs.getValue())); -// } -// -// Authorizations authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; -// String auth = conf.getAuth(); -// if (auth != null) { -// authorizations = new Authorizations(auth.split(",")); -// } -// String table = RdfCloudTripleStoreUtils.layoutToTable(tableLayout, conf); -// result = createScanner(connector, authorizations, table, context, startTime, ttl, ranges); -//// if (isBatchScanner) { -//// ((BatchScanner) scanner).setRanges(ranges); -//// } else { -//// for (Range range : ranges) { -//// ((Scanner) scanner).setRange(range); //TODO: Not good way of doing this -//// } -//// } -//// -//// if (isBatchScanner) { -//// result = ((BatchScanner) scanner).iterator(); -//// } else { -//// result = ((Scanner) scanner).iterator(); -//// } -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// protected Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> createScanner(Connector connector, Authorizations authorizations, String table, Resource context, Long startTime, Long ttl, Collection<Range> ranges) throws TableNotFoundException, IOException { -//// ShardedConnector shardedConnector = new ShardedConnector(connector, 4, ta) -// if (rangeMap.ranges.size() > (numOfThreads / 2)) { //TODO: Arbitrary number, make configurable -// BatchScanner scannerBase = connector.createBatchScanner(table, authorizations, numOfThreads); -// scannerBase.setRanges(ranges); -// populateScanner(context, startTime, ttl, scannerBase); -// return scannerBase.iterator(); -// } else { -// isBatchScanner = false; -// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>>[] iters = new Iterator[ranges.size()]; -// int i = 0; -// for (Range range : ranges) { -// Scanner scannerBase = connector.createScanner(table, authorizations); -// populateScanner(context, startTime, ttl, scannerBase); -// scannerBase.setRange(range); -// iters[i] = scannerBase.iterator(); -// i++; -// scanner = scannerBase; //TODO: Always overridden, but doesn't matter since Scanner doesn't need to be closed -// } -// return Iterators.concat(iters); -// } -// -// } -// -// protected void populateScanner(Resource context, Long startTime, Long ttl, ScannerBase scannerBase) throws IOException { -// if (context != null) { //default graph -// context_txt = new Text(writeValue(context)); -// scannerBase.fetchColumnFamily(context_txt); -// } -// -//// if (!isQueryTimeBased(conf)) { -// if (startTime != null && ttl != null) { -//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); -//// scannerBase.setScanIteratorOption("filteringIterator", "0", TimeRangeFilter.class.getName()); -//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.TIME_RANGE_PROP, ttl); -//// scannerBase.setScanIteratorOption("filteringIterator", "0." + TimeRangeFilter.START_TIME_PROP, startTime); -// IteratorSetting setting = new IteratorSetting(1, "fi", TimestampFilter.class.getName()); -// TimestampFilter.setStart(setting, startTime, true); -// TimestampFilter.setEnd(setting, startTime + ttl, true); -// scannerBase.addScanIterator(setting); -// } else if (ttl != null) { -//// scannerBase.setScanIterators(1, FilteringIterator.class.getName(), "filteringIterator"); -//// scannerBase.setScanIteratorOption("filteringIterator", "0", AgeOffFilter.class.getName()); -//// scannerBase.setScanIteratorOption("filteringIterator", "0.ttl", ttl); -// IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class.getName()); -// AgeOffFilter.setTTL(setting, ttl); -// scannerBase.addScanIterator(setting); -// } -//// } -// } -// -// @Override -// public void close() throws RdfDAOException { -// if (!open) -// return; -// verifyIsOpen(); -// open = false; -// if (scanner != null && isBatchScanner) { -// ((BatchScanner) scanner).close(); -// } -// } -// -// public void verifyIsOpen() throws RdfDAOException { -// if (!open) { -// throw new RdfDAOException("Iterator not open"); -// } -// } -// -// @Override -// public boolean hasNext() throws RdfDAOException { -// try { -// if (!open) -// return false; -// verifyIsOpen(); -// /** -// * For some reason, the result.hasNext returns false -// * once at the end of the iterator, and then true -// * for every subsequent call. -// */ -// hasNext = (hasNext && result.hasNext()); -// return hasNext || ((iter_bss != null) && iter_bss.hasNext()); -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// @Override -// public Entry<Statement, BindingSet> next() throws RdfDAOException { -// try { -// if (!this.hasNext()) -// return null; -// -// return getStatement(result, contexts); -// } catch (Exception e) { -// throw new RdfDAOException(e); -// } -// } -// -// public Entry<Statement, BindingSet> getStatement( -// Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> rowResults, -// Resource... filterContexts) throws IOException { -// try { -// while (true) { -// if (iter_bss != null && iter_bss.hasNext()) { -// return new RdfCloudTripleStoreUtils.CustomEntry<Statement, BindingSet>(statement, iter_bss.next()); -// } -// -// if (rowResults.hasNext()) { -// Entry<Key, org.apache.accumulo.core.data.Value> entry = rowResults.next(); -// Key key = entry.getKey(); -// ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); -// statement = RdfCloudTripleStoreUtils.translateStatementFromRow(input, key.getColumnFamily(), tableLayout, RdfCloudTripleStoreConstants.VALUE_FACTORY); -// iter_bss = rangeMap.containsKey(key).iterator(); -// } else -// break; -// } -// } catch (Exception e) { -// throw new IOException(e); -// } -// return null; -// } -// -// @Override -// public void remove() throws RdfDAOException { -// next(); -// } -// -// public int getNumOfThreads() { -// return numOfThreads; -// } -// -// public void setNumOfThreads(int numOfThreads) { -// this.numOfThreads = numOfThreads; -// } -// -// public DefineTripleQueryRangeFactory getQueryRangeFactory() { -// return queryRangeFactory; -// } -// -// public void setQueryRangeFactory(DefineTripleQueryRangeFactory queryRangeFactory) { -// this.queryRangeFactory = queryRangeFactory; -// } -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java deleted file mode 100644 index 157fc5a..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRdfUtils.java +++ /dev/null @@ -1,72 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.resolver.triple.TripleRow; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; - -/** - * Class AccumuloRdfUtils - * Date: Mar 1, 2012 - * Time: 7:15:54 PM - */ -public class AccumuloRdfUtils { - private static final Log logger = LogFactory.getLog(AccumuloRdfUtils.class); - - public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws AccumuloException, AccumuloSecurityException, TableExistsException { - boolean tableExists = tableOperations.exists(tableName); - if (!tableExists) { - logger.debug("Creating accumulo table: " + tableName); - tableOperations.create(tableName); - } - } - - public static Key from(TripleRow tripleRow) { - return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), - defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), - defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); - } - - public static Value extractValue(TripleRow tripleRow) { - return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); - } - - private static byte[] defaultTo(byte[] bytes, byte[] def) { - return bytes != null ? bytes : def; - } - - private static Long defaultTo(Long l, Long def) { - return l != null ? l : def; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java deleted file mode 100644 index 195030e..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/AccumuloRyaDAO.java +++ /dev/null @@ -1,551 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.accumulo.AccumuloRdfConstants.ALL_AUTHORIZATIONS; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.INFO_NAMESPACE_TXT; -import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_MEMORY; -import static mvm.rya.api.RdfCloudTripleStoreConstants.MAX_TIME; -import static mvm.rya.api.RdfCloudTripleStoreConstants.NUM_THREADS; -import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; -import static mvm.rya.api.RdfCloudTripleStoreConstants.RTS_VERSION_PREDICATE_RYA; -import static mvm.rya.api.RdfCloudTripleStoreConstants.VERSION_RYA; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchDeleter; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.openrdf.model.Namespace; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - -import info.aduna.iteration.CloseableIteration; -import mvm.rya.accumulo.experimental.AccumuloIndexer; -import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.layout.TableLayoutStrategy; -import mvm.rya.api.persist.RyaDAO; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.persist.RyaNamespaceManager; -import mvm.rya.api.resolver.RyaTripleContext; - -public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { - private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); - - private boolean initialized = false; - private boolean flushEachUpdate = true; - private Connector connector; - private BatchWriterConfig batchWriterConfig; - - private MultiTableBatchWriter mt_bw; - - // Do not flush these individually - private BatchWriter bw_spo; - private BatchWriter bw_po; - private BatchWriter bw_osp; - - private BatchWriter bw_ns; - - private List<AccumuloIndexer> secondaryIndexers; - - private AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - private RyaTableMutationsFactory ryaTableMutationsFactory; - private TableLayoutStrategy tableLayoutStrategy; - private AccumuloRyaQueryEngine queryEngine; - private RyaTripleContext ryaContext; - - @Override - public boolean isInitialized() throws RyaDAOException { - return initialized; - } - - @Override - public void init() throws RyaDAOException { - if (initialized) { - return; - } - try { - checkNotNull(conf); - checkNotNull(connector); - - if(batchWriterConfig == null){ - batchWriterConfig = new BatchWriterConfig(); - batchWriterConfig.setMaxMemory(MAX_MEMORY); - batchWriterConfig.setTimeout(MAX_TIME, TimeUnit.MILLISECONDS); - batchWriterConfig.setMaxWriteThreads(NUM_THREADS); - } - - tableLayoutStrategy = conf.getTableLayoutStrategy(); - ryaContext = RyaTripleContext.getInstance(conf); - ryaTableMutationsFactory = new RyaTableMutationsFactory(ryaContext); - - secondaryIndexers = conf.getAdditionalIndexers(); - - flushEachUpdate = conf.flushEachUpdate(); - - TableOperations tableOperations = connector.tableOperations(); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); - AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); - - for (AccumuloIndexer index : secondaryIndexers) { - index.setConf(conf); - } - - mt_bw = connector.createMultiTableBatchWriter(batchWriterConfig); - - //get the batch writers for tables - bw_spo = mt_bw.getBatchWriter(tableLayoutStrategy.getSpo()); - bw_po = mt_bw.getBatchWriter(tableLayoutStrategy.getPo()); - bw_osp = mt_bw.getBatchWriter(tableLayoutStrategy.getOsp()); - - bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs()); - - for (AccumuloIndexer index : secondaryIndexers) { - index.setConnector(connector); - index.setMultiTableBatchWriter(mt_bw); - index.init(); - } - - queryEngine = new AccumuloRyaQueryEngine(connector, conf); - - checkVersion(); - - initialized = true; - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public String getVersion() throws RyaDAOException { - String version = null; - CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); - if (versIter.hasNext()) { - version = versIter.next().getObject().getData(); - } - versIter.close(); - - return version; - } - - @Override - public void add(RyaStatement statement) throws RyaDAOException { - commit(Iterators.singletonIterator(statement)); - } - - @Override - public void add(Iterator<RyaStatement> iter) throws RyaDAOException { - commit(iter); - } - - @Override - public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { - this.delete(Iterators.singletonIterator(stmt), aconf); - } - - @Override - public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException { - try { - while (statements.hasNext()) { - RyaStatement stmt = statements.next(); - //query first - CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); - while (query.hasNext()) { - deleteSingleRyaStatement(query.next()); - } - - for (AccumuloIndexer index : secondaryIndexers) { - index.deleteStatement(stmt); - } - } - if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { - BatchDeleter bd_spo = null; - BatchDeleter bd_po = null; - BatchDeleter bd_osp = null; - - try { - bd_spo = createBatchDeleter(tableLayoutStrategy.getSpo(), conf.getAuthorizations()); - bd_po = createBatchDeleter(tableLayoutStrategy.getPo(), conf.getAuthorizations()); - bd_osp = createBatchDeleter(tableLayoutStrategy.getOsp(), conf.getAuthorizations()); - - bd_spo.setRanges(Collections.singleton(new Range())); - bd_po.setRanges(Collections.singleton(new Range())); - bd_osp.setRanges(Collections.singleton(new Range())); - - for (RyaURI graph : graphs){ - bd_spo.fetchColumnFamily(new Text(graph.getData())); - bd_po.fetchColumnFamily(new Text(graph.getData())); - bd_osp.fetchColumnFamily(new Text(graph.getData())); - } - - bd_spo.delete(); - bd_po.delete(); - bd_osp.delete(); - - //TODO indexers do not support delete-UnsupportedOperation Exception will be thrown -// for (AccumuloIndex index : secondaryIndexers) { -// index.dropGraph(graphs); -// } - - } catch (Exception e) { - throw new RyaDAOException(e); - } finally { - if (bd_spo != null) { - bd_spo.close(); - } - if (bd_po != null) { - bd_po.close(); - } - if (bd_osp != null) { - bd_osp.close(); - } - } - - } - - protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException { - Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt); - bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO)); - bw_po.addMutations(map.get(TABLE_LAYOUT.PO)); - bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP)); - } - - protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { - try { - //TODO: Should have a lock here in case we are adding and committing at the same time - while (commitStatements.hasNext()) { - RyaStatement stmt = commitStatements.next(); - - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); - bw_spo.addMutations(spo); - bw_po.addMutations(po); - bw_osp.addMutations(osp); - - for (AccumuloIndexer index : secondaryIndexers) { - index.storeStatement(stmt); - } - } - - if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public void destroy() throws RyaDAOException { - if (!initialized) { - return; - } - //TODO: write lock - try { - initialized = false; - mt_bw.flush(); - - mt_bw.close(); - } catch (Exception e) { - throw new RyaDAOException(e); - } - for(AccumuloIndexer indexer : this.secondaryIndexers) { - try { - indexer.destroy(); - } catch(Exception e) { - logger.warn("Failed to destroy indexer", e); - } - } - } - - @Override - public void addNamespace(String pfx, String namespace) throws RyaDAOException { - try { - Mutation m = new Mutation(new Text(pfx)); - m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); - bw_ns.addMutation(m); - if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public String getNamespace(String pfx) throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); - scanner.setRange(new Range(new Text(pfx))); - Iterator<Map.Entry<Key, Value>> iterator = scanner - .iterator(); - - if (iterator.hasNext()) { - return new String(iterator.next().getValue().get()); - } - } catch (Exception e) { - throw new RyaDAOException(e); - } - return null; - } - - @Override - public void removeNamespace(String pfx) throws RyaDAOException { - try { - Mutation del = new Mutation(new Text(pfx)); - del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); - bw_ns.addMutation(del); - if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { - try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), - ALL_AUTHORIZATIONS); - scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); - Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); - return new AccumuloNamespaceTableIterator(result); - } catch (Exception e) { - throw new RyaDAOException(e); - } - } - - @Override - public RyaNamespaceManager<AccumuloRdfConfiguration> getNamespaceManager() { - return this; - } - - @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - for (String tableName : getTables()) { - try { - purge(tableName, configuration.getAuths()); - compact(tableName); - } catch (TableNotFoundException e) { - logger.error(e.getMessage()); - } catch (MutationsRejectedException e) { - logger.error(e.getMessage()); - } - } - for(AccumuloIndexer indexer : this.secondaryIndexers) { - try { - indexer.purge(configuration); - } catch(Exception e) { - logger.error("Failed to purge indexer", e); - } - } - } - - @Override - public void dropAndDestroy() throws RyaDAOException { - for (String tableName : getTables()) { - try { - drop(tableName); - } catch (AccumuloSecurityException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (AccumuloException e) { - logger.error(e.getMessage()); - throw new RyaDAOException(e); - } catch (TableNotFoundException e) { - logger.warn(e.getMessage()); - } - } - destroy(); - for(AccumuloIndexer indexer : this.secondaryIndexers) { - try { - indexer.dropAndDestroy(); - } catch(Exception e) { - logger.error("Failed to drop and destroy indexer", e); - } - } - } - - public Connector getConnector() { - return connector; - } - - public void setConnector(Connector connector) { - this.connector = connector; - } - - public BatchWriterConfig getBatchWriterConfig(){ - return batchWriterConfig; - } - - public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) { - this.batchWriterConfig = batchWriterConfig; - } - - protected MultiTableBatchWriter getMultiTableBatchWriter(){ - return mt_bw; - } - - @Override - public AccumuloRdfConfiguration getConf() { - return conf; - } - - @Override - public void setConf(AccumuloRdfConfiguration conf) { - this.conf = conf; - } - - public RyaTableMutationsFactory getRyaTableMutationsFactory() { - return ryaTableMutationsFactory; - } - - public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { - this.ryaTableMutationsFactory = ryaTableMutationsFactory; - } - - @Override - public AccumuloRyaQueryEngine getQueryEngine() { - return queryEngine; - } - - public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) { - this.queryEngine = queryEngine; - } - - public void flush() throws RyaDAOException { - try { - mt_bw.flush(); - } catch (MutationsRejectedException e) { - throw new RyaDAOException(e); - } - } - - protected String[] getTables() { - // core tables - List<String> tableNames = Lists.newArrayList( - tableLayoutStrategy.getSpo(), - tableLayoutStrategy.getPo(), - tableLayoutStrategy.getOsp(), - tableLayoutStrategy.getNs(), - tableLayoutStrategy.getEval()); - - // Additional Tables - for (AccumuloIndexer index : secondaryIndexers) { - tableNames.add(index.getTableName()); - } - - return tableNames.toArray(new String[]{}); - } - - private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { - if (tableExists(tableName)) { - logger.info("Purging accumulo table: " + tableName); - BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); - try { - batchDeleter.setRanges(Collections.singleton(new Range())); - batchDeleter.delete(); - } finally { - batchDeleter.close(); - } - } - } - - private void compact(String tableName) { - logger.info("Requesting major compaction for table " + tableName); - try { - connector.tableOperations().compact(tableName, null, null, true, false); - } catch (Exception e) { - logger.error(e.getMessage()); - } - } - - private boolean tableExists(String tableName) { - return getConnector().tableOperations().exists(tableName); - } - - private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { - return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); - } - - private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException { - String version = getVersion(); - if (version == null) { - //adding to core Rya tables but not Indexes - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement()); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); - bw_spo.addMutations(spo); - bw_po.addMutations(po); - bw_osp.addMutations(osp); - } - //TODO: Do a version check here - } - - protected RyaStatement getVersionRyaStatement() { - return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); - } - - private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - logger.info("Dropping cloudbase table: " + tableName); - connector.tableOperations().delete(tableName); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java deleted file mode 100644 index b5a4e84..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/DefineTripleQueryRangeFactory.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -//package mvm.rya.accumulo; - -// -//import com.google.common.io.ByteArrayDataOutput; -//import com.google.common.io.ByteStreams; -//import mvm.rya.api.RdfCloudTripleStoreUtils; -//import mvm.rya.api.domain.RangeValue; -//import org.apache.accumulo.core.data.Range; -//import org.apache.hadoop.io.Text; -//import org.openrdf.model.Value; -//import org.openrdf.model.ValueFactory; -//import org.openrdf.model.impl.ValueFactoryImpl; -// -//import java.io.IOException; -//import java.util.Map; -// -//import static mvm.rya.api.RdfCloudTripleStoreConstants.*; -//import static mvm.rya.api.RdfCloudTripleStoreUtils.CustomEntry; -// -///** -// * Class DefineTripleQueryRangeFactory -// * Date: Jun 2, 2011 -// * Time: 10:35:43 AM -// */ -//public class DefineTripleQueryRangeFactory { -// -// ValueFactory vf = ValueFactoryImpl.getInstance(); -// -// protected void fillRange(ByteArrayDataOutput startRowOut, ByteArrayDataOutput endRowOut, Value val, boolean empty) -// throws IOException { -// if(!empty) { -// startRowOut.write(DELIM_BYTES); -// endRowOut.write(DELIM_BYTES); -// } -// //null check? -// if(val instanceof RangeValue) { -// RangeValue rangeValue = (RangeValue) val; -// Value start = rangeValue.getStart(); -// Value end = rangeValue.getEnd(); -// byte[] start_val_bytes = RdfCloudTripleStoreUtils.writeValue(start); -// byte[] end_val_bytes = RdfCloudTripleStoreUtils.writeValue(end); -// startRowOut.write(start_val_bytes); -// endRowOut.write(end_val_bytes); -// } else { -// byte[] val_bytes = RdfCloudTripleStoreUtils.writeValue(val); -// startRowOut.write(val_bytes); -// endRowOut.write(val_bytes); -// } -// } -// -// public Map.Entry<TABLE_LAYOUT, Range> defineRange(Value subject, Value predicate, Value object, AccumuloRdfConfiguration conf) -// throws IOException { -// -// byte[] startrow, stoprow; -// ByteArrayDataOutput startRowOut = ByteStreams.newDataOutput(); -// ByteArrayDataOutput stopRowOut = ByteStreams.newDataOutput(); -// Range range; -// TABLE_LAYOUT tableLayout; -// -// if (subject != null) { -// /** -// * Case: s -// * Table: spo -// * Want this to be the first if statement since it will be most likely the most asked for table -// */ -// tableLayout = TABLE_LAYOUT.SPO; -// fillRange(startRowOut, stopRowOut, subject, true); -// if (predicate != null) { -// /** -// * Case: sp -// * Table: spo -// */ -// fillRange(startRowOut, stopRowOut, predicate, false); -// if (object != null) { -// /** -// * Case: spo -// * Table: spo -// */ -// fillRange(startRowOut, stopRowOut, object, false); -// } -// } else if (object != null) { -// /** -// * Case: so -// * Table: osp -// * Very rare case. Could have put this in the OSP if clause, but I wanted to reorder the if statement -// * for best performance. The SPO table probably gets the most scans, so I want it to be the first if -// * statement in the branch. -// */ -// tableLayout = TABLE_LAYOUT.OSP; -// startRowOut = ByteStreams.newDataOutput(); -// stopRowOut = ByteStreams.newDataOutput(); -// fillRange(startRowOut, stopRowOut, object, true); -// fillRange(startRowOut, stopRowOut, subject, false); -// } -// } else if (predicate != null) { -// /** -// * Case: p -// * Table: po -// * Wanted this to be the second if statement, since it will be the second most asked for table -// */ -// tableLayout = TABLE_LAYOUT.PO; -// fillRange(startRowOut, stopRowOut, predicate, true); -// if (object != null) { -// /** -// * Case: po -// * Table: po -// */ -// fillRange(startRowOut, stopRowOut, object, false); -// } -// } else if (object != null) { -// /** -// * Case: o -// * Table: osp -// * Probably a pretty rare scenario -// */ -// tableLayout = TABLE_LAYOUT.OSP; -// fillRange(startRowOut, stopRowOut, object, true); -// } else { -// tableLayout = TABLE_LAYOUT.SPO; -// stopRowOut.write(Byte.MAX_VALUE); -// } -// -// startrow = startRowOut.toByteArray(); -// stopRowOut.write(DELIM_STOP_BYTES); -// stoprow = stopRowOut.toByteArray(); -// Text startRowTxt = new Text(startrow); -// Text stopRowTxt = new Text(stoprow); -// range = new Range(startRowTxt, stopRowTxt); -// -// return new CustomEntry<TABLE_LAYOUT, Range>(tableLayout, range); -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java deleted file mode 100644 index 574029e..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableKeyValues.java +++ /dev/null @@ -1,115 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; - -import java.io.IOException; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; - -public class RyaTableKeyValues { - public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); - public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); - - RyaTripleContext instance; - - private RyaStatement stmt; - private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); - private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); - - public RyaTableKeyValues(RyaStatement stmt, RdfCloudTripleStoreConfiguration conf) { - this.stmt = stmt; - this.instance = RyaTripleContext.getInstance(conf); - } - - public Collection<Map.Entry<Key, Value>> getSpo() { - return spo; - } - - public Collection<Map.Entry<Key, Value>> getPo() { - return po; - } - - public Collection<Map.Entry<Key, Value>> getOsp() { - return osp; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public RyaTableKeyValues invoke() throws IOException { - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */try { - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - timestamp = timestamp == null ? 0l : timestamp; - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); - po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); - osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), - new Text(tripleRow.getColumnFamily()), - new Text(tripleRow.getColumnQualifier()), - cv, timestamp), v)); - } catch (TripleRowResolverException e) { - throw new IOException(e); - } - return this; - } - - @Override - public String toString() { - return "RyaTableKeyValues{" + - "statement=" + stmt + - ", spo=" + spo + - ", po=" + po + - ", o=" + osp + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java deleted file mode 100644 index 2a4871d..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/RyaTableMutationsFactory.java +++ /dev/null @@ -1,148 +0,0 @@ -package mvm.rya.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.io.Text; - -public class RyaTableMutationsFactory { - - RyaTripleContext ryaContext; - - public RyaTableMutationsFactory(RyaTripleContext ryaContext) { - this.ryaContext = ryaContext; - } - - //TODO: Does this still need to be collections - public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( - RyaStatement stmt) throws IOException { - - Collection<Mutation> spo_muts = new ArrayList<Mutation>(); - Collection<Mutation> po_muts = new ArrayList<Mutation>(); - Collection<Mutation> osp_muts = new ArrayList<Mutation>(); - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */ - try { - Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); - spo_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.PO); - po_muts.add(createMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.OSP); - osp_muts.add(createMutation(tripleRow)); - } catch (TripleRowResolverException fe) { - throw new IOException(fe); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = - new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); - - return mutations; - } - - public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serializeDelete( - RyaStatement stmt) throws IOException { - - Collection<Mutation> spo_muts = new ArrayList<Mutation>(); - Collection<Mutation> po_muts = new ArrayList<Mutation>(); - Collection<Mutation> osp_muts = new ArrayList<Mutation>(); - /** - * TODO: If there are contexts, do we still replicate the information into the default graph as well - * as the named graphs? - */ - try { - Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); - TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); - spo_muts.add(deleteMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.PO); - po_muts.add(deleteMutation(tripleRow)); - tripleRow = rowMap.get(TABLE_LAYOUT.OSP); - osp_muts.add(deleteMutation(tripleRow)); - } catch (TripleRowResolverException fe) { - throw new IOException(fe); - } - - Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = - new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); - mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); - - return mutations; - - } - - protected Mutation deleteMutation(TripleRow tripleRow) { - Mutation m = new Mutation(new Text(tripleRow.getRow())); - - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), - tripleRow.getTimestamp()); - return m; - } - - protected Mutation createMutation(TripleRow tripleRow) { - Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - byte[] columnVisibility = tripleRow.getColumnVisibility(); - ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - Long timestamp = tripleRow.getTimestamp(); - byte[] value = tripleRow.getValue(); - Value v = value == null ? EMPTY_VALUE : new Value(value); - byte[] columnQualifier = tripleRow.getColumnQualifier(); - Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - byte[] columnFamily = tripleRow.getColumnFamily(); - Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - mutation.put(cfText, cqText, cv, timestamp, v); - return mutation; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java deleted file mode 100644 index 5df5da9..0000000 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/experimental/AbstractAccumuloIndexer.java +++ /dev/null @@ -1,59 +0,0 @@ -package mvm.rya.accumulo.experimental; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.IOException; -import java.util.Collection; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; - -import org.apache.accumulo.core.client.MultiTableBatchWriter; - -public abstract class AbstractAccumuloIndexer implements AccumuloIndexer { - - @Override - public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { - } - - @Override - public void storeStatements(Collection<RyaStatement> statements) throws IOException { - for (RyaStatement s : statements) { - storeStatement(s); - } - } - - @Override - public void deleteStatement(RyaStatement stmt) throws IOException { - } - - @Override - public void dropGraph(RyaURI... graphs) { - } - - @Override - public void flush() throws IOException { - } - - @Override - public void close() throws IOException { - } -}