http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java new file mode 100644 index 0000000..2a09669 --- /dev/null +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/Upgrade322ToolTest.java @@ -0,0 +1,318 @@ +package mvm.rya.accumulo.mr.upgrade; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import junit.framework.TestCase; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.query.AccumuloRyaQueryEngine; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.query.RyaQuery; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.*; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.calrissian.mango.collect.CloseableIterable; +import org.openrdf.model.vocabulary.XMLSchema; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +/** + * Created by IntelliJ IDEA. + * Date: 4/25/12 + * Time: 10:51 AM + * To change this template use File | Settings | File Templates. + */ +public class Upgrade322ToolTest extends TestCase { + + private String user = "user"; + private String pwd = "pwd"; + private String instance = "myinstance"; + private String tablePrefix = "t_"; + private Authorizations auths = Constants.NO_AUTHS; + private Connector connector; + + @Override + public void setUp() throws Exception { + super.setUp(); + + final String spoTable = tablePrefix + + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + final String poTable = tablePrefix + + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; + final String ospTable = tablePrefix + + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; + + connector = new MockInstance(instance).getConnector(user, pwd.getBytes()); + + connector.tableOperations().create(spoTable); + connector.tableOperations().create(poTable); + connector.tableOperations().create(ospTable); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + SecurityOperations secOps = connector.securityOperations(); + secOps.createUser(user, pwd.getBytes(), auths); + secOps.grantTablePermission(user, spoTable, TablePermission.READ); + secOps.grantTablePermission(user, poTable, TablePermission.READ); + secOps.grantTablePermission(user, ospTable, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, TablePermission.WRITE); + + //load data + final BatchWriter ospWriter = connector + .createBatchWriter(ospTable, new BatchWriterConfig()); + ospWriter.addMutation(getMutation("00000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u0001\u0004")); + ospWriter.addMutation(getMutation("00000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u0001\u0005")); + ospWriter.addMutation(getMutation("00000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u0001\t")); + ospWriter.addMutation(getMutation("00001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u0001\u0006")); + ospWriter.addMutation(getMutation("10\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" + + "://here/2010/tracked-data-provenance/ns#shortLit\u0001http://www.w3" + + ".org/2001/XMLSchema#short\u0001\b")); + ospWriter.addMutation(getMutation("10.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" + + "://www.w3.org/2001/XMLSchema#float\u0001\b")); + ospWriter.addMutation(getMutation("3.0.0\u0000urn:mvm.rya/2012/05#rts\u0000urn:mvm" + + ".rya/2012/05#version\u0001\u0003")); + ospWriter.addMutation(getMutation("9223370726404375807\u0000http://here/2010/tracked-data-provenance/ns" + + "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" + + "\u0001\u0007")); + ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#Created\u0000http://here" + + "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" + + ".org/1999/02/22-rdf-syntax-ns#type\u0001\u0002")); + ospWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http" + + "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" + + "/tracked-data-provenance/ns#uriLit\u0001\u0002")); + ospWriter.addMutation(getMutation("stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0001" + + "\u0003")); + ospWriter.addMutation(getMutation("true\u0000http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0001\n")); + ospWriter.flush(); + ospWriter.close(); + + final BatchWriter spoWriter = connector + .createBatchWriter(spoTable, new BatchWriterConfig()); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0001\u0004")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0001\u0005")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0001\t")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0001\u0006")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10\u0000http" + + "://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0001http://www.w3" + + ".org/2001/XMLSchema#short\u0001\b")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#floatLit\u0001http" + + "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b")); + spoWriter.addMutation(getMutation("urn:mvm.rya/2012/05#rts\u0000urn:mvm" + + ".rya/2012/05#version\u00003.0.0\u0001\u0003")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns" + + "#uuid10\u0000http://here/2010/tracked-data-provenance/ns#dateLit" + + "\u00009223370726404375807\u0001\u0007")); + spoWriter.addMutation(getMutation("http://here" + + "/2010/tracked-data-provenance/ns#uuid10\u0000http://www.w3" + + ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0001\u0002")); + spoWriter.addMutation(getMutation("http" + + "://here/2010/tracked-data-provenance/ns#uuid10\u0000http://here/2010" + + "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0001\u0002")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0001" + + "\u0003")); + spoWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#uuid10" + + "\u0000http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0001\n")); + spoWriter.flush(); + spoWriter.close(); + + final BatchWriter poWriter = connector + .createBatchWriter(poTable, new BatchWriterConfig()); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#longLit\u000000000000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0004")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#intLit\u000000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0005")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#byteLit\u000000000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\t")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#doubleLit\u000000001 1.0\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0006")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#shortLit\u000010\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http://www.w3" + + ".org/2001/XMLSchema#short\u0001\b")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#floatLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001http" + + "://www.w3.org/2001/XMLSchema#float\u000010.0\u0001\b")); + poWriter.addMutation(getMutation("urn:mvm" + + ".rya/2012/05#version\u00003.0.0\u0000urn:mvm.rya/2012/05#rts\u0001\u0003")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#dateLit" + + "\u00009223370726404375807\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0007")); + poWriter.addMutation(getMutation("http://www.w3" + + ".org/1999/02/22-rdf-syntax-ns#type\u0000http://here/2010/tracked-data-provenance/ns#Created\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002")); + poWriter.addMutation(getMutation("http://here/2010" + + "/tracked-data-provenance/ns#uriLit\u0000http://here/2010/tracked-data-provenance/ns#objectuuid1\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\u0002")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#stringLit\u0000stringLit\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001" + + "\u0003")); + poWriter.addMutation(getMutation("http://here/2010/tracked-data-provenance/ns#booleanLit\u0000true\u0000http://here/2010/tracked-data-provenance/ns#uuid10\u0001\n")); + poWriter.flush(); + poWriter.close(); + } + + public Mutation getMutation(String row) { + final Mutation mutation = new Mutation(row); + mutation.put("", "", ""); + return mutation; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + connector.tableOperations().delete( + tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); + connector.tableOperations().delete( + tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + } + + public void testUpgrade() throws Exception { + Upgrade322Tool.main(new String[]{ + "-Dac.mock=true", + "-Dac.instance=" + instance, + "-Dac.username=" + user, + "-Dac.pwd=" + pwd, + "-Drdf.tablePrefix=" + tablePrefix, + }); + + final AccumuloRdfConfiguration configuration = new AccumuloRdfConfiguration(); + configuration.setTablePrefix(tablePrefix); + final AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); + ryaDAO.setConnector(connector); + ryaDAO.setConf(configuration); + ryaDAO.init(); + + final AccumuloRyaQueryEngine queryEngine = ryaDAO.getQueryEngine(); + + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#booleanLit"), + new RyaType(XMLSchema.BOOLEAN, "true")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#longLit"), + new RyaType(XMLSchema.LONG, "10")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#intLit"), + new RyaType(XMLSchema.INTEGER, "10")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#byteLit"), + new RyaType(XMLSchema.BYTE, "10")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#doubleLit"), + new RyaType(XMLSchema.DOUBLE, "10.0")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#dateLit"), + new RyaType(XMLSchema.DATETIME, "2011-07-12T06:00:00.000Z")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#stringLit"), + new RyaType("stringLit")), queryEngine); + verify(new RyaStatement( + new RyaURI("http://here/2010/tracked-data-provenance/ns#uuid10"), + new RyaURI("http://here/2010/tracked-data-provenance/ns#uriLit"), + new RyaURI("http://here/2010/tracked-data-provenance/ns" + + "#objectuuid1")), queryEngine); + verify(new RyaStatement( + new RyaURI("urn:mvm.rya/2012/05#rts"), + new RyaURI("urn:mvm.rya/2012/05#version"), + new RyaType("3.0.0")), queryEngine); + } + + private void verify(RyaStatement ryaStatement, AccumuloRyaQueryEngine queryEngine) + throws RyaDAOException, IOException { + + //check osp + CloseableIterable<RyaStatement> statements = + queryEngine.query(RyaQuery.builder(new RyaStatement(null, null, ryaStatement.getObject())) + .build()); + try { + verifyFirstStatement(ryaStatement, statements); + } finally { + statements.close(); + } + + //check po + statements = queryEngine.query(RyaQuery.builder( + new RyaStatement(null, ryaStatement.getPredicate(), + ryaStatement.getObject())).build()); + try { + verifyFirstStatement(ryaStatement, statements); + } finally { + statements.close(); + } + + //check spo + statements = queryEngine.query(RyaQuery.builder( + new RyaStatement(ryaStatement.getSubject(), + ryaStatement.getPredicate(), + ryaStatement.getObject())).build()); + try { + verifyFirstStatement(ryaStatement, statements); + } finally { + statements.close(); + } + } + + private void verifyFirstStatement( + RyaStatement ryaStatement, CloseableIterable<RyaStatement> statements) { + final Iterator<RyaStatement> iterator = statements.iterator(); + assertTrue(iterator.hasNext()); + final RyaStatement first = iterator.next(); + assertEquals(ryaStatement.getSubject(), first.getSubject()); + assertEquals(ryaStatement.getPredicate(), first.getPredicate()); + assertEquals(ryaStatement.getObject(), first.getObject()); + assertFalse(iterator.hasNext()); + } + + public void printTableData(String tableName) + throws TableNotFoundException{ + Scanner scanner = connector.createScanner(tableName, auths); + scanner.setRange(new Range()); + for(Map.Entry<Key, Value> entry : scanner) { + final Key key = entry.getKey(); + final Value value = entry.getValue(); + System.out.println(key.getRow() + " " + key.getColumnFamily() + " " + key.getColumnQualifier() + " " + key.getTimestamp() + " " + value.toString()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java new file mode 100644 index 0000000..027bd7e --- /dev/null +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/upgrade/UpgradeObjectSerializationTest.java @@ -0,0 +1,118 @@ +package mvm.rya.accumulo.mr.upgrade; + +/* + * #%L + * mvm.rya.accumulo.rya + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import mvm.rya.api.resolver.impl.*; +import org.junit.Test; + +import static mvm.rya.accumulo.mr.upgrade.Upgrade322Tool.UpgradeObjectSerialization; +import static org.junit.Assert.*; + +public class UpgradeObjectSerializationTest { + + @Test + public void testBooleanUpgrade() throws Exception { + String object = "true"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, BooleanRyaTypeResolver.BOOLEAN_LITERAL_MARKER); + + assertEquals("1", upgrade); + } + + @Test + public void testBooleanUpgradeFalse() throws Exception { + String object = "false"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, BooleanRyaTypeResolver.BOOLEAN_LITERAL_MARKER); + + assertEquals("0", upgrade); + } + + @Test + public void testByteUpgradeLowest() throws Exception { + String object = "-127"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, ByteRyaTypeResolver.LITERAL_MARKER); + + assertEquals("81", upgrade); + } + + @Test + public void testByteUpgradeHighest() throws Exception { + String object = "127"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, ByteRyaTypeResolver.LITERAL_MARKER); + + assertEquals("7f", upgrade); + } + + @Test + public void testLongUpgrade() throws Exception { + String object = "00000000000000000010"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, LongRyaTypeResolver.LONG_LITERAL_MARKER); + + assertEquals("800000000000000a", upgrade); + } + + @Test + public void testIntUpgrade() throws Exception { + String object = "00000000010"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, IntegerRyaTypeResolver.INTEGER_LITERAL_MARKER); + + assertEquals("8000000a", upgrade); + } + + @Test + public void testDateTimeUpgrade() throws Exception { + String object = "9223370726404375807"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, DateTimeRyaTypeResolver.DATETIME_LITERAL_MARKER); + + assertEquals("800001311cee3b00", upgrade); + } + + @Test + public void testDoubleUpgrade() throws Exception { + String object = "00001 1.0"; + final UpgradeObjectSerialization upgradeObjectSerialization + = new UpgradeObjectSerialization(); + final String upgrade = upgradeObjectSerialization + .upgrade(object, DoubleRyaTypeResolver.DOUBLE_LITERAL_MARKER); + + assertEquals("c024000000000000", upgrade); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/resources/namedgraphs.trig ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/resources/namedgraphs.trig b/dao/accumulo.rya/src/test/resources/namedgraphs.trig new file mode 100644 index 0000000..b647632 --- /dev/null +++ b/dao/accumulo.rya/src/test/resources/namedgraphs.trig @@ -0,0 +1,7 @@ +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . +@prefix swp: <http://www.w3.org/2004/03/trix/swp-1/> . +@prefix dc: <http://purl.org/dc/elements/1.1/> . +@prefix ex: <http://www.example.org/vocabulary#> . +@prefix : <http://www.example.org/exampleDocument#> . +:G1 { :Monica ex:name "Monica Murphy" . } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/accumulo.rya/src/test/resources/test.ntriples ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/resources/test.ntriples b/dao/accumulo.rya/src/test/resources/test.ntriples new file mode 100644 index 0000000..26a0a17 --- /dev/null +++ b/dao/accumulo.rya/src/test/resources/test.ntriples @@ -0,0 +1 @@ +<urn:lubm:rdfts#GraduateStudent01> <urn:lubm:rdfts#hasFriend> <urn:lubm:rdfts#GraduateStudent02> . \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/pom.xml ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/pom.xml b/dao/cloudbase.rya/pom.xml new file mode 100644 index 0000000..5909e9d --- /dev/null +++ b/dao/cloudbase.rya/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>mvm.rya</groupId> + <artifactId>rya.dao</artifactId> + <version>3.2.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>cloudbase.rya</artifactId> + <name>${project.groupId}.${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.utils</artifactId> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.iterators</artifactId> + </dependency> + + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>rya.indexing</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Cloudbase deps --> + <dependency> + <groupId>cloudbase</groupId> + <artifactId>cloudbase-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <exclusions> + <!-- the log4j that comes with zookeeper 3.3.5 has some bad dependencies --> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.texeltek</groupId> + <artifactId>accumulo-cloudbase-shim</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>mvm.rya</groupId> + <artifactId>cloudbase.iterators</artifactId> + <optional>true</optional> + </dependency> + + + <!-- Sesame runtime --> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-ntriples</artifactId> + <version>${openrdf.sesame.version}</version> + </dependency> + + </dependencies> + + <profiles> + <profile> + <id>mr</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java new file mode 100644 index 0000000..7980d85 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/BatchScannerIterator.java @@ -0,0 +1,59 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NoSuchElementException; + +/** + * The intention of this iterator is the wrap the iterator that is returned by a + * BatchScan in cloudbase in order to serve as a workaround for + * ACCUMULO-226 (https://issues.apache.org/jira/browse/ACCUMULO-226). The bug + * involves subsequent calls to hasNext() on batch scan results after false has been + * returned will return true + * <p/> + * A patch has been submitted and accepted in Accumulo but this wrapper can be used + * for previous versions of Cloudbase/Accumulo that do not yet have the patch. + */ +public class BatchScannerIterator implements Iterator<Entry<Key, Value>> { + + private Iterator<Entry<Key, Value>> cloudbaseScanner = null; + + private Entry<Key, Value> nextKeyValue = null; + + public BatchScannerIterator(Iterator<Entry<Key, Value>> cloudbaseScanner) { + this.cloudbaseScanner = cloudbaseScanner; + } + + public boolean hasNext() { + if (nextKeyValue == null) { + if (cloudbaseScanner.hasNext()) { + nextKeyValue = cloudbaseScanner.next(); + } + } + return !isTerminatingKeyValue(nextKeyValue); + } + + private boolean isTerminatingKeyValue(Entry<Key, Value> nextEntry) { + if (nextEntry == null) { + return true; + } + return !(nextEntry.getKey() != null && nextEntry.getValue() != null); //Condition taken from cloudbase's TabletServerBatchReaderIterator + } + + public Entry<Key, Value> next() { + if (hasNext()) { + Entry<Key, Value> entry = nextKeyValue; + nextKeyValue = null; + return entry; + } else { + throw new NoSuchElementException(); + } + } + + public void remove() { + cloudbaseScanner.remove(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java new file mode 100644 index 0000000..b20d79c --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseNamespaceTableIterator.java @@ -0,0 +1,78 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import com.google.common.base.Preconditions; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.persist.RdfDAOException; +import org.openrdf.model.Namespace; +import org.openrdf.model.impl.NamespaceImpl; + +import java.io.IOError; +import java.util.Iterator; +import java.util.Map.Entry; + +public class CloudbaseNamespaceTableIterator<T extends Namespace> implements + CloseableIteration<Namespace, RdfDAOException> { + + private boolean open = false; + private Iterator<Entry<Key, Value>> result; + + public CloudbaseNamespaceTableIterator(Iterator<Entry<Key, Value>> result) throws RdfDAOException { + Preconditions.checkNotNull(result); + open = true; + this.result = result; + } + + @Override + public void close() throws RdfDAOException { + try { + verifyIsOpen(); + open = false; + } catch (IOError e) { + throw new RdfDAOException(e); + } + } + + public void verifyIsOpen() throws RdfDAOException { + if (!open) { + throw new RdfDAOException("Iterator not open"); + } + } + + @Override + public boolean hasNext() throws RdfDAOException { + verifyIsOpen(); + return result != null && result.hasNext(); + } + + @Override + public Namespace next() throws RdfDAOException { + if (hasNext()) { + return getNamespace(result); + } + return null; + } + + public static Namespace getNamespace(Iterator<Entry<Key, Value>> rowResults) { + for (; rowResults.hasNext(); ) { + Entry<Key, Value> next = rowResults.next(); + Key key = next.getKey(); + Value val = next.getValue(); + String cf = key.getColumnFamily().toString(); + String cq = key.getColumnQualifier().toString(); + return new NamespaceImpl(key.getRow().toString(), new String( + val.get())); + } + return null; + } + + @Override + public void remove() throws RdfDAOException { + next(); + } + + public boolean isOpen() { + return open; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java new file mode 100644 index 0000000..e25c910 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConfiguration.java @@ -0,0 +1,44 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.security.Authorizations; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.hadoop.conf.Configuration; + +/** + * Created by IntelliJ IDEA. + * Date: 4/25/12 + * Time: 3:24 PM + * To change this template use File | Settings | File Templates. + */ +public class CloudbaseRdfConfiguration extends RdfCloudTripleStoreConfiguration { + + public static final String MAXRANGES_SCANNER = "cb.query.maxranges"; + + public CloudbaseRdfConfiguration() { + super(); + } + + public CloudbaseRdfConfiguration(Configuration other) { + super(other); + } + + @Override + public CloudbaseRdfConfiguration clone() { + return new CloudbaseRdfConfiguration(this); + } + + public Authorizations getAuthorizations() { + String[] auths = getAuths(); + if (auths == null || auths.length == 0) + return CloudbaseRdfConstants.ALL_AUTHORIZATIONS; + return new Authorizations(auths); + } + + public void setMaxRangesForScanner(Integer max) { + setInt(MAXRANGES_SCANNER, max); + } + + public Integer getMaxRangesForScanner() { + return getInt(MAXRANGES_SCANNER, 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java new file mode 100644 index 0000000..690a050 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfConstants.java @@ -0,0 +1,20 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.CBConstants; +import cloudbase.core.data.Value; +import cloudbase.core.security.Authorizations; +import cloudbase.core.security.ColumnVisibility; + +/** + * Interface CloudbaseRdfConstants + * Date: Mar 1, 2012 + * Time: 7:24:52 PM + */ +public interface CloudbaseRdfConstants { + public static final Authorizations ALL_AUTHORIZATIONS = CBConstants.NO_AUTHS; + + public static final Value EMPTY_VALUE = new Value(new byte[0]); + + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(new byte[0]); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java new file mode 100644 index 0000000..075d1fe --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfEvalStatsDAO.java @@ -0,0 +1,138 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.client.Connector; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.admin.TableOperations; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.security.Authorizations; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreStatement; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RdfDAOException; +import mvm.rya.api.persist.RdfEvalStatsDAO; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Resource; +import org.openrdf.model.Value; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.api.RdfCloudTripleStoreConstants.*; + +/** + * Class CloudbaseRdfEvalStatsDAO + * Date: Feb 28, 2012 + * Time: 5:03:16 PM + */ +public class CloudbaseRdfEvalStatsDAO implements RdfEvalStatsDAO<CloudbaseRdfConfiguration> { + + private boolean initialized = false; + private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); + + private Collection<RdfCloudTripleStoreStatement> statements = new ArrayList<RdfCloudTripleStoreStatement>(); + private Connector connector; + + // private String evalTable = TBL_EVAL; + private TableLayoutStrategy tableLayoutStrategy; + + @Override + public void init() throws RdfDAOException { + try { + if (isInitialized()) { + throw new IllegalStateException("Already initialized"); + } + checkNotNull(connector); + tableLayoutStrategy = conf.getTableLayoutStrategy(); +// evalTable = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); +// conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_EVAL, evalTable); + + TableOperations tos = connector.tableOperations(); + CloudbaseRdfUtils.createTableIfNotExist(tos, tableLayoutStrategy.getEval()); +// boolean tableExists = tos.exists(evalTable); +// if (!tableExists) +// tos.create(evalTable); + initialized = true; + } catch (Exception e) { + throw new RdfDAOException(e); + } + } + + @Override + public double getCardinality(CloudbaseRdfConfiguration conf, CARDINALITY_OF card, Value val) throws RdfDAOException { + return this.getCardinality(conf, card, val, null); + } + + @Override + public double getCardinality(CloudbaseRdfConfiguration conf, CARDINALITY_OF card, Value val, Resource context) throws RdfDAOException { + try { + Authorizations authorizations = conf.getAuthorizations(); + Scanner scanner = connector.createScanner(tableLayoutStrategy.getEval(), authorizations); + Text cfTxt = null; + if (CARDINALITY_OF.SUBJECT.equals(card)) { + cfTxt = SUBJECT_CF_TXT; + } else if (CARDINALITY_OF.PREDICATE.equals(card)) { + cfTxt = PRED_CF_TXT; + } else if (CARDINALITY_OF.OBJECT.equals(card)) { +// cfTxt = OBJ_CF_TXT; //TODO: How do we do object cardinality + return Double.MAX_VALUE; + } else throw new IllegalArgumentException("Not right Cardinality[" + card + "]"); + Text cq = EMPTY_TEXT; + if (context != null) { + cq = new Text(context.stringValue().getBytes()); + } + scanner.fetchColumn(cfTxt, cq); + scanner.setRange(new Range(new Text(val.stringValue().getBytes()))); + Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iter = scanner.iterator(); + if (iter.hasNext()) { + return Double.parseDouble(new String(iter.next().getValue().get())); + } + } catch (Exception e) { + throw new RdfDAOException(e); + } + + //default + return -1; + } + + @Override + public void destroy() throws RdfDAOException { + if (!isInitialized()) { + throw new IllegalStateException("Not initialized"); + } + initialized = false; + } + + @Override + public boolean isInitialized() throws RdfDAOException { + return initialized; + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + +// public String getEvalTable() { +// return evalTable; +// } +// +// public void setEvalTable(String evalTable) { +// this.evalTable = evalTable; +// } + + public CloudbaseRdfConfiguration getConf() { + return conf; + } + + public void setConf(CloudbaseRdfConfiguration conf) { + this.conf = conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java new file mode 100644 index 0000000..9114ae8 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRdfUtils.java @@ -0,0 +1,50 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.client.CBException; +import cloudbase.core.client.CBSecurityException; +import cloudbase.core.client.TableExistsException; +import cloudbase.core.client.admin.TableOperations; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import mvm.rya.api.resolver.triple.TripleRow; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; + +/** + * Class CloudbaseRdfUtils + * Date: Mar 1, 2012 + * Time: 7:15:54 PM + */ +public class CloudbaseRdfUtils { + private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class); + + public static void createTableIfNotExist(TableOperations tableOperations, String tableName) throws TableExistsException, CBSecurityException, CBException { + boolean tableExists = tableOperations.exists(tableName); + if (!tableExists) { + logger.info("Creating cloudbase table: " + tableName); + tableOperations.create(tableName); + } + } + + public static Key from(TripleRow tripleRow) { + return new Key(defaultTo(tripleRow.getRow(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnFamily(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnQualifier(), EMPTY_BYTES), + defaultTo(tripleRow.getColumnVisibility(), EMPTY_BYTES), + defaultTo(tripleRow.getTimestamp(), Long.MAX_VALUE)); + } + + public static Value extractValue(TripleRow tripleRow) { + return new Value(defaultTo(tripleRow.getValue(), EMPTY_BYTES)); + } + + private static byte[] defaultTo(byte[] bytes, byte[] def) { + return bytes != null ? bytes : def; + } + + private static Long defaultTo(Long l, Long def) { + return l != null ? l : def; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java new file mode 100644 index 0000000..a3045e6 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/CloudbaseRyaDAO.java @@ -0,0 +1,428 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.client.*; +import cloudbase.core.client.Scanner; +import cloudbase.core.client.admin.TableOperations; +import cloudbase.core.client.impl.TabletServerBatchDeleter; +import cloudbase.core.conf.Property; +import cloudbase.core.data.Key; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Range; +import cloudbase.core.security.Authorizations; +import cloudbase.core.security.ColumnVisibility; +import com.google.common.collect.Iterators; +import info.aduna.iteration.CloseableIteration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.layout.TableLayoutStrategy; +import mvm.rya.api.persist.RyaDAO; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.persist.RyaNamespaceManager; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.cloudbase.query.CloudbaseRyaQueryEngine; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Namespace; + +import java.text.SimpleDateFormat; +import java.util.*; + +import static com.google.common.base.Preconditions.checkNotNull; +import static mvm.rya.api.RdfCloudTripleStoreConstants.*; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.ALL_AUTHORIZATIONS; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; + +/** + * Class CloudbaseRyaDAO + * Date: Feb 29, 2012 + * Time: 12:37:22 PM + */ +public class CloudbaseRyaDAO implements RyaDAO<CloudbaseRdfConfiguration>, RyaNamespaceManager<CloudbaseRdfConfiguration> { + private static final Log logger = LogFactory.getLog(CloudbaseRyaDAO.class); + + private boolean initialized = false; + private Connector connector; + + private BatchWriter bw_spo; + private BatchWriter bw_po; + private BatchWriter bw_osp; + private BatchWriter bw_ns; + + private CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); + private ColumnVisibility cv = EMPTY_CV; + private RyaTableMutationsFactory ryaTableMutationsFactory = new RyaTableMutationsFactory(); + private TableLayoutStrategy tableLayoutStrategy; + private CloudbaseRyaQueryEngine queryEngine; + private RyaContext ryaContext = RyaContext.getInstance(); + + @Override + public boolean isInitialized() throws RyaDAOException { + return initialized; + } + + @Override + public void init() throws RyaDAOException { + if (initialized) + return; + try { + checkNotNull(conf); + checkNotNull(connector); + + tableLayoutStrategy = conf.getTableLayoutStrategy(); + String cv_s = conf.getCv(); + if (cv_s != null) { + cv = new ColumnVisibility(cv_s); + } + + TableOperations tableOperations = connector.tableOperations(); + CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); + CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); + CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); + CloudbaseRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); + + //get the batch writers for tables + bw_spo = connector.createBatchWriter(tableLayoutStrategy.getSpo(), MAX_MEMORY, MAX_TIME, + NUM_THREADS); + bw_po = connector.createBatchWriter(tableLayoutStrategy.getPo(), MAX_MEMORY, MAX_TIME, + NUM_THREADS); + bw_osp = connector.createBatchWriter(tableLayoutStrategy.getOsp(), MAX_MEMORY, MAX_TIME, + NUM_THREADS); + + bw_ns = connector.createBatchWriter(tableLayoutStrategy.getNs(), MAX_MEMORY, + MAX_TIME, 1); + + queryEngine = new CloudbaseRyaQueryEngine(connector, getConf()); + + checkVersion(); + + initialized = true; + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + public String getVersion() throws RyaDAOException { + String version = null; + CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); + if (versIter.hasNext()) { + version = versIter.next().getObject().getData(); + } + versIter.close(); + + return version; + } + + @Override + public void add(RyaStatement statement) throws RyaDAOException { + commit(Iterators.singletonIterator(statement)); + } + + @Override + public void add(Iterator<RyaStatement> iter) throws RyaDAOException { + commit(iter); + } + + @Override + public void delete(RyaStatement stmt, CloudbaseRdfConfiguration aconf) throws RyaDAOException { + this.delete(Iterators.singletonIterator(stmt), aconf); + } + + @Override + public void delete(Iterator<RyaStatement> statements, CloudbaseRdfConfiguration conf) throws RyaDAOException { + try { + while (statements.hasNext()) { + RyaStatement stmt = statements.next(); + //query first + CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); + while (query.hasNext()) { + deleteSingleRyaStatement(query.next()); + } + } + bw_spo.flush(); + bw_po.flush(); + bw_osp.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + protected void deleteSingleRyaStatement(RyaStatement stmt) throws TripleRowResolverException, MutationsRejectedException { + Map<TABLE_LAYOUT, TripleRow> map = ryaContext.serializeTriple(stmt); + bw_spo.addMutation(deleteMutation(map.get(TABLE_LAYOUT.SPO))); + bw_po.addMutation(deleteMutation(map.get(TABLE_LAYOUT.PO))); + bw_osp.addMutation(deleteMutation(map.get(TABLE_LAYOUT.OSP))); + } + + protected Mutation deleteMutation(TripleRow tripleRow) { + Mutation m = new Mutation(new Text(tripleRow.getRow())); + + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + + m.putDelete(cfText, cqText, new ColumnVisibility(tripleRow.getColumnVisibility()), tripleRow.getTimestamp()); + return m; + } + + protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { + try { + //TODO: Should have a lock here in case we are adding and committing at the same time + while (commitStatements.hasNext()) { + + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(commitStatements.next()); + Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + bw_spo.addMutations(spo); + bw_po.addMutations(po); + bw_osp.addMutations(osp); + } + + bw_spo.flush(); + bw_po.flush(); + bw_osp.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void destroy() throws RyaDAOException { + if (!initialized) { + return; + } + //TODO: write lock + try { + initialized = false; + bw_osp.flush(); + bw_spo.flush(); + bw_po.flush(); + bw_ns.flush(); + + bw_osp.close(); + bw_spo.close(); + bw_po.close(); + bw_ns.close(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public void addNamespace(String pfx, String namespace) throws RyaDAOException { + try { + Mutation m = new Mutation(new Text(pfx)); + m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new cloudbase.core.data.Value( + namespace.getBytes())); + bw_ns.addMutation(m); + bw_ns.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public String getNamespace(String pfx) throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); + scanner.setRange(new Range(new Text(pfx))); + Iterator<Map.Entry<Key, cloudbase.core.data.Value>> iterator = scanner + .iterator(); + + if (iterator.hasNext()) { + return new String(iterator.next().getValue().get()); + } + } catch (Exception e) { + throw new RyaDAOException(e); + } + return null; + } + + @Override + public void removeNamespace(String pfx) throws RyaDAOException { + try { + Mutation del = new Mutation(new Text(pfx)); + del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); + bw_ns.addMutation(del); + bw_ns.flush(); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { + try { + Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + ALL_AUTHORIZATIONS); + scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); + Iterator<Map.Entry<Key, cloudbase.core.data.Value>> result = scanner.iterator(); + return new CloudbaseNamespaceTableIterator(result); + } catch (Exception e) { + throw new RyaDAOException(e); + } + } + + @Override + public RyaNamespaceManager<CloudbaseRdfConfiguration> getNamespaceManager() { + return this; + } + + @Override + public void purge(RdfCloudTripleStoreConfiguration configuration) { + for (String tableName : getTables()) { + try { + purge(tableName, configuration.getAuths()); + compact(tableName); + } catch (TableNotFoundException e) { + logger.error(e.getMessage()); + } catch (MutationsRejectedException e) { + logger.error(e.getMessage()); + } + } + try { + if (isInitialized()) { + checkVersion(); + } + } catch (RyaDAOException e) { + logger.error("checkVersion() failed?", e); + } + } + + @Override + public void dropAndDestroy() throws RyaDAOException { + for (String tableName : getTables()) { + try { + drop(tableName); + } catch (CBSecurityException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (CBException e) { + logger.error(e.getMessage()); + throw new RyaDAOException(e); + } catch (TableNotFoundException e) { + logger.warn(e.getMessage()); + } + } + destroy(); + } + + public Connector getConnector() { + return connector; + } + + public void setConnector(Connector connector) { + this.connector = connector; + } + + public CloudbaseRdfConfiguration getConf() { + return conf; + } + + public void setConf(CloudbaseRdfConfiguration conf) { + this.conf = conf; + } + + public RyaTableMutationsFactory getRyaTableMutationsFactory() { + return ryaTableMutationsFactory; + } + + public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { + this.ryaTableMutationsFactory = ryaTableMutationsFactory; + } + + public CloudbaseRyaQueryEngine getQueryEngine() { + return queryEngine; + } + + public void setQueryEngine(CloudbaseRyaQueryEngine queryEngine) { + this.queryEngine = queryEngine; + } + + protected String[] getTables() { + return new String[] { + tableLayoutStrategy.getSpo() + , tableLayoutStrategy.getPo() + , tableLayoutStrategy.getOsp() + , tableLayoutStrategy.getNs() + , tableLayoutStrategy.getEval() + }; + } + + private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { + if (tableExists(tableName)) { + logger.info("Purging cloudbase table: " + tableName); + BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); + try { + batchDeleter.setRanges(Collections.singleton(new Range())); + batchDeleter.delete(); + } finally { + ((TabletServerBatchDeleter)batchDeleter).close(); + } + } + } + + private void compact(String tableName) { + Date now = new Date(System.currentTimeMillis()); + SimpleDateFormat dateParser = new SimpleDateFormat("yyyyMMddHHmmssz", Locale.getDefault()); + String nowStr = dateParser.format(now); + try { + for (Map.Entry<String, String> prop : connector.tableOperations().getProperties(tableName)) { + if (prop.getKey().equals(Property.TABLE_MAJC_COMPACTALL_AT.getKey())) { + if (dateParser.parse(prop.getValue()).after(now)) { + return; + } else { + break; + } + } + } + + connector.tableOperations().flush(tableName); + logger.info("Requesting major compaction for table " + tableName); + connector.tableOperations().setProperty(tableName, Property.TABLE_MAJC_COMPACTALL_AT.getKey(), nowStr); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + + private Authorizations getAuthorizations(String auth) { + if (auth == null) { + return new Authorizations(); + } else { + String[] auths = auth.split(","); + return new Authorizations(auths); + } + } + + private boolean tableExists(String tableName) { + return getConnector().tableOperations().exists(tableName); + } + + private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { + return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); + } + + private void checkVersion() throws RyaDAOException { + String version = getVersion(); + if (version == null) { + this.add(getVersionRyaStatement()); + } + //TODO: Do a version check here + } + + protected RyaStatement getVersionRyaStatement() { + return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); + } + + private void drop(String tableName) throws CBSecurityException, CBException, TableNotFoundException { + logger.info("Dropping cloudbase table: " + tableName); + connector.tableOperations().delete(tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java new file mode 100644 index 0000000..8869759 --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableKeyValues.java @@ -0,0 +1,93 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.security.ColumnVisibility; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; + +import static java.util.AbstractMap.SimpleEntry; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE; + +public class RyaTableKeyValues { + public static final ColumnVisibility EMPTY_CV = new ColumnVisibility(); + public static final Text EMPTY_CV_TEXT = new Text(EMPTY_CV.getExpression()); + + RyaContext instance = RyaContext.getInstance(); + + private RyaStatement stmt; + private Collection<Map.Entry<Key, Value>> spo = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> po = new ArrayList<Map.Entry<Key, Value>>(); + private Collection<Map.Entry<Key, Value>> osp = new ArrayList<Map.Entry<Key, Value>>(); + + public RyaTableKeyValues(RyaStatement stmt) { + this.stmt = stmt; + } + + public Collection<Map.Entry<Key, Value>> getSpo() { + return spo; + } + + public Collection<Map.Entry<Key, Value>> getPo() { + return po; + } + + public Collection<Map.Entry<Key, Value>> getOsp() { + return osp; + } + + public RyaTableKeyValues invoke() throws IOException { + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */try { + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, mvm.rya.api.resolver.triple.TripleRow> rowMap = instance.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + Text cv = columnVisibility == null ? EMPTY_CV_TEXT : new Text(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + timestamp = timestamp == null ? 0l : timestamp; + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + spo.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO); + po.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + tripleRow = rowMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP); + osp.add(new SimpleEntry(new Key(new Text(tripleRow.getRow()), + new Text(tripleRow.getColumnFamily()), + new Text(tripleRow.getColumnQualifier()), + cv, timestamp), v)); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + return this; + } + + @Override + public String toString() { + return "RyaTableKeyValues{" + + "statement=" + stmt + + ", spo=" + spo + + ", po=" + po + + ", o=" + osp + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java new file mode 100644 index 0000000..ab9b37d --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/RyaTableMutationsFactory.java @@ -0,0 +1,81 @@ +package mvm.rya.cloudbase; + +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Value; +import cloudbase.core.security.ColumnVisibility; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_CV; +import static mvm.rya.cloudbase.CloudbaseRdfConstants.EMPTY_VALUE; + +public class RyaTableMutationsFactory { + + RyaContext ryaContext = RyaContext.getInstance(); + + public RyaTableMutationsFactory() { + } + + //TODO: Does this still need to be collections + public Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> serialize( + RyaStatement stmt) throws IOException { + + Collection<Mutation> spo_muts = new ArrayList<Mutation>(); + Collection<Mutation> po_muts = new ArrayList<Mutation>(); + Collection<Mutation> osp_muts = new ArrayList<Mutation>(); + /** + * TODO: If there are contexts, do we still replicate the information into the default graph as well + * as the named graphs? + */ + try { + Map<TABLE_LAYOUT, TripleRow> rowMap = ryaContext.serializeTriple(stmt); + TripleRow tripleRow = rowMap.get(TABLE_LAYOUT.SPO); + spo_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.PO); + po_muts.add(createMutation(tripleRow)); + tripleRow = rowMap.get(TABLE_LAYOUT.OSP); + osp_muts.add(createMutation(tripleRow)); + } catch (TripleRowResolverException fe) { + throw new IOException(fe); + } + + Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>> mutations = + new HashMap<RdfCloudTripleStoreConstants.TABLE_LAYOUT, Collection<Mutation>>(); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, spo_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO, po_muts); + mutations.put(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP, osp_muts); + + return mutations; + } + + protected Mutation createMutation(TripleRow tripleRow) { + Mutation mutation = new Mutation(new Text(tripleRow.getRow())); + byte[] columnVisibility = tripleRow.getColumnVisibility(); + ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); + Long timestamp = tripleRow.getTimestamp(); + timestamp = timestamp == null ? 0l : timestamp; + byte[] value = tripleRow.getValue(); + Value v = value == null ? EMPTY_VALUE : new Value(value); + byte[] columnQualifier = tripleRow.getColumnQualifier(); + Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); + byte[] columnFamily = tripleRow.getColumnFamily(); + Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); + + mutation.put(cfText,cqText, cv, timestamp, v); + return mutation; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java ---------------------------------------------------------------------- diff --git a/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java new file mode 100644 index 0000000..5c6e8cf --- /dev/null +++ b/dao/cloudbase.rya/src/main/java/mvm/rya/cloudbase/mr/eval/CloudbaseRdfCountTool.java @@ -0,0 +1,350 @@ +package mvm.rya.cloudbase.mr.eval; + +import cloudbase.core.CBConstants; +import cloudbase.core.client.mapreduce.CloudbaseInputFormat; +import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; +import cloudbase.core.data.Key; +import cloudbase.core.data.Mutation; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.FilteringIterator; +import cloudbase.core.iterators.filter.AgeOffFilter; +import cloudbase.core.security.Authorizations; +import cloudbase.core.security.ColumnVisibility; +import cloudbase.core.util.Pair; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.cloudbase.CloudbaseRdfConstants; +import mvm.rya.cloudbase.mr.utils.MRUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; + +/** + * Count subject, predicate, object. Save in table + * Class RdfCloudTripleStoreCountTool + * Date: Apr 12, 2011 + * Time: 10:39:40 AM + */ +public class CloudbaseRdfCountTool implements Tool { + + public static final String TTL_PROP = "mvm.rya.cloudbase.sail.mr.eval.ttl"; + + private Configuration conf; + + public static void main(String[] args) { + try { + + ToolRunner.run(new Configuration(), new CloudbaseRdfCountTool(), args); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * cloudbase props + */ + private RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; + private String userName = "root"; + private String pwd = "password"; + private String instance = "stratus"; + private String zk = "10.40.190.113:2181"; + private Authorizations authorizations = CBConstants.NO_AUTHS; + private String ttl = null; + + @Override + public int run(String[] strings) throws Exception { + conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); + + //conf + zk = conf.get(MRUtils.CB_ZK_PROP, zk); + ttl = conf.get(MRUtils.CB_TTL_PROP, ttl); + instance = conf.get(MRUtils.CB_INSTANCE_PROP, instance); + userName = conf.get(MRUtils.CB_USERNAME_PROP, userName); + pwd = conf.get(MRUtils.CB_PWD_PROP, pwd); + boolean mock = conf.getBoolean(MRUtils.CB_MOCK_PROP, false); + String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + if (tablePrefix != null) + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( + conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); + + String auth = conf.get(MRUtils.CB_AUTH_PROP); + if (auth != null) + authorizations = new Authorizations(auth.split(",")); + + conf.setBoolean("mapred.map.tasks.speculative.execution", false); + conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); + conf.set("io.sort.mb", "256"); + Job job = new Job(conf); + job.setJarByClass(CloudbaseRdfCountTool.class); + + //set ttl + ttl = conf.get(TTL_PROP); + + // set up cloudbase input + job.setInputFormatClass(CloudbaseInputFormat.class); + CloudbaseInputFormat.setInputInfo(job, userName, pwd.getBytes(), + RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix), authorizations); + CloudbaseInputFormat.setZooKeeperInstance(job, instance, zk); + Collection<Pair<Text, Text>> columns = new ArrayList<Pair<Text, Text>>(); + //TODO: What about named graphs/contexts here? +// final Pair pair = new Pair(RdfCloudTripleStoreConstants.INFO_TXT, RdfCloudTripleStoreConstants.INFO_TXT); +// columns.add(pair); +// CloudbaseInputFormat.fetchColumns(job, columns); + if (ttl != null) { + CloudbaseInputFormat.setIterator(job, 1, FilteringIterator.class.getName(), "filteringIterator"); + CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", "0", AgeOffFilter.class.getName()); + CloudbaseInputFormat.setIteratorOption(job, "filteringIterator", "0.ttl", ttl); + } + + CloudbaseInputFormat.setRanges(job, Lists.newArrayList(new Range(new Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE})))); + + // set input output of the particular job + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Mutation.class); + + // set mapper and reducer classes + job.setMapperClass(CountPiecesMapper.class); + job.setCombinerClass(CountPiecesCombiner.class); + job.setReducerClass(CountPiecesReducer.class); + + CloudbaseOutputFormat.setOutputInfo(job, userName, pwd.getBytes(), true, tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + CloudbaseOutputFormat.setZooKeeperInstance(job, instance, zk); + job.setOutputFormatClass(CloudbaseOutputFormat.class); + + // Submit the job + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int exitCode = job.waitForCompletion(true) ? 0 : 1; + + if (exitCode == 0) { + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + + " seconds."); + return 0; + } else { + System.out.println("Job Failed!!!"); + } + + return -1; + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + public String getInstance() { + return instance; + } + + public void setInstance(String instance) { + this.instance = instance; + } + + public String getPwd() { + return pwd; + } + + public void setPwd(String pwd) { + this.pwd = pwd; + } + + public String getZk() { + return zk; + } + + public void setZk(String zk) { + this.zk = zk; + } + + public String getTtl() { + return ttl; + } + + public void setTtl(String ttl) { + this.ttl = ttl; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public static class CountPiecesMapper extends Mapper<Key, Value, Text, LongWritable> { + + public static final byte[] EMPTY_BYTES = new byte[0]; + private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP; + + ValueFactoryImpl vf = new ValueFactoryImpl(); + + private Text keyOut = new Text(); + private LongWritable valOut = new LongWritable(1); + private RyaContext ryaContext = RyaContext.getInstance(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( + conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + try { + RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); + //count each piece subject, pred, object + + String subj = statement.getSubject().getData(); + String pred = statement.getPredicate().getData(); +// byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); + RyaURI scontext = statement.getContext(); + boolean includesContext = scontext != null; + String scontext_str = (includesContext) ? scontext.getData() : null; + + ByteArrayDataOutput output = ByteStreams.newDataOutput(); + output.writeUTF(subj); + output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + + output = ByteStreams.newDataOutput(); + output.writeUTF(pred); + output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); + output.writeBoolean(includesContext); + if (includesContext) + output.writeUTF(scontext_str); + keyOut.set(output.toByteArray()); + context.write(keyOut, valOut); + + + //TODO: Obj in eval stats table? +// output = ByteStreams.newDataOutput(); +// output.write(objBytes); +// output.writeByte(RdfCloudTripleStoreConstants.DELIM_BYTE); +// output.writeUTF(RdfCloudTripleStoreConstants.OBJ_CF); +// output.writeBoolean(includesContext); +// if (includesContext) +// output.write(scontext_bytes); +// keyOut.set(output.toByteArray()); +// context.write(keyOut, valOut); + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + } + } + + public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { + + private LongWritable valOut = new LongWritable(); + + // TODO: can still add up to be larger I guess + // any count lower than this does not need to be saved + public static final int TOO_LOW = 2; + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + valOut.set(count); + context.write(key, valOut); + } + + } + + public static class CountPiecesReducer extends Reducer<Text, LongWritable, Text, Mutation> { + + Text row = new Text(); + Text cat_txt = new Text(); + Value v_out = new Value(); + ValueFactory vf = new ValueFactoryImpl(); + + // any count lower than this does not need to be saved + public static final int TOO_LOW = 10; + private String tablePrefix; + protected Text table; + private ColumnVisibility cv = CloudbaseRdfConstants.EMPTY_CV; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); + final String cv_s = context.getConfiguration().get(MRUtils.CB_CV_PROP); + if (cv_s != null) + cv = new ColumnVisibility(cv_s); + } + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + long count = 0; + for (LongWritable lw : values) { + count += lw.get(); + } + + if (count <= TOO_LOW) + return; + + ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); + String v = badi.readUTF(); + cat_txt.set(badi.readUTF()); + + Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; + boolean includesContext = badi.readBoolean(); + if (includesContext) { + columnQualifier = new Text(badi.readUTF()); + } + + row.set(v); + Mutation m = new Mutation(row); + v_out.set((count + "").getBytes()); + m.put(cat_txt, columnQualifier, cv, v_out); + context.write(table, m); + } + + } +} \ No newline at end of file
