http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/java/mvm/rya/rdftriplestore/evaluation/RdfCloudTripleStoreSelectivityEvaluationStatisticsTest.java ---------------------------------------------------------------------- diff --git a/sail/src/test/java/mvm/rya/rdftriplestore/evaluation/RdfCloudTripleStoreSelectivityEvaluationStatisticsTest.java b/sail/src/test/java/mvm/rya/rdftriplestore/evaluation/RdfCloudTripleStoreSelectivityEvaluationStatisticsTest.java new file mode 100644 index 0000000..c5f56cf --- /dev/null +++ b/sail/src/test/java/mvm/rya/rdftriplestore/evaluation/RdfCloudTripleStoreSelectivityEvaluationStatisticsTest.java @@ -0,0 +1,304 @@ +package mvm.rya.rdftriplestore.evaluation; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.layout.TablePrefixLayoutStrategy; +import mvm.rya.api.persist.RdfEvalStatsDAO; +import mvm.rya.joinselect.AccumuloSelectivityEvalDAO; +import mvm.rya.prospector.service.ProspectorServiceEvalStatsDAO; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +public class RdfCloudTripleStoreSelectivityEvaluationStatisticsTest { + + // TODO fix table names!!! + + private static final String DELIM = "\u0000"; + private final byte[] EMPTY_BYTE = new byte[0]; + private final Value EMPTY_VAL = new Value(EMPTY_BYTE); + + private String q1 = ""// + + "SELECT ?h " // + + "{" // + + " ?h <http://www.w3.org/2000/01/rdf-schema#label> <uri:dog> ."// + + " ?h <uri:barksAt> <uri:cat> ."// + + " ?h <uri:peesOn> <uri:hydrant> . "// + + "}";// + + private Connector conn; + AccumuloRdfConfiguration arc; + BatchWriterConfig config; + Instance mock; + + @Before + public void init() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { + + mock = new MockInstance("accumulo"); + PasswordToken pToken = new PasswordToken("pass".getBytes()); + conn = mock.getConnector("user", pToken); + + config = new BatchWriterConfig(); + config.setMaxMemory(1000); + config.setMaxLatency(1000, TimeUnit.SECONDS); + config.setMaxWriteThreads(10); + + if (conn.tableOperations().exists("rya_prospects")) { + conn.tableOperations().delete("rya_prospects"); + } + if (conn.tableOperations().exists("rya_selectivity")) { + conn.tableOperations().delete("rya_selectivity"); + } + + arc = new AccumuloRdfConfiguration(); + arc.setTableLayoutStrategy(new TablePrefixLayoutStrategy()); + arc.setMaxRangesForScanner(300); + + } + + @Test + public void testOptimizeQ1() throws Exception { + + RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> res = new ProspectorServiceEvalStatsDAO(conn, arc); + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setRdfEvalDAO(res); + accc.setConnector(conn); + accc.init(); + + BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config); + BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config); + + String s1 = "predicateobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + List<Mutation> mList = new ArrayList<Mutation>(); + List<Mutation> mList2 = new ArrayList<Mutation>(); + List<String> sList = Arrays.asList("subjectobject", "subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate", + "predicatesubject"); + Mutation m1, m2, m3, m4; + + m1 = new Mutation(s1 + DELIM + "1"); + m1.put(new Text("count"), new Text(""), new Value("1".getBytes())); + m2 = new Mutation(s2 + DELIM + "2"); + m2.put(new Text("count"), new Text(""), new Value("2".getBytes())); + m3 = new Mutation(s3 + DELIM + "3"); + m3.put(new Text("count"), new Text(""), new Value("3".getBytes())); + mList.add(m1); + mList.add(m2); + mList.add(m3); + + bw1.addMutations(mList); + bw1.close(); + +// Scanner scan = conn.createScanner("rya_prospects", new Authorizations()); +// scan.setRange(new Range()); + +// for (Map.Entry<Key, Value> entry : scan) { +// System.out.println("Key row string is " + entry.getKey().getRow().toString()); +// System.out.println("Key is " + entry.getKey()); +// System.out.println("Value is " + (new String(entry.getValue().get()))); +// } + + m1 = new Mutation(s1); + m2 = new Mutation(s2); + m3 = new Mutation(s3); + m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m4.put(new Text("FullTableCardinality"), new Text("100"), EMPTY_VAL); + int i = 2; + int j = 3; + int k = 4; + Long count1; + Long count2; + Long count3; + + for (String s : sList) { + count1 = (long) i; + count2 = (long) j; + count3 = (long) k; + m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL); + m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL); + m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL); + i = 2 * i; + j = 2 * j; + k = 2 * k; + } + mList2.add(m1); + mList2.add(m2); + mList2.add(m3); + mList2.add(m4); + bw2.addMutations(mList2); + bw2.close(); + +// scan = conn.createScanner("rya_selectivity", new Authorizations()); +// scan.setRange(new Range()); + +// for (Map.Entry<Key, Value> entry : scan) { +// System.out.println("Key row string is " + entry.getKey().getRow().toString()); +// System.out.println("Key is " + entry.getKey()); +// System.out.println("Value is " + (new String(entry.getKey().getColumnQualifier().toString()))); +// +// } + + TupleExpr te = getTupleExpr(q1); + System.out.println(te); + + RdfCloudTripleStoreSelectivityEvaluationStatistics ars = new RdfCloudTripleStoreSelectivityEvaluationStatistics(arc, res, accc); + double card = ars.getCardinality(te); + + Assert.assertEquals(6.3136, card, .0001); + + } + + @Test + public void testOptimizeQ1ZeroCard() throws Exception { + + RdfEvalStatsDAO<RdfCloudTripleStoreConfiguration> res = new ProspectorServiceEvalStatsDAO(conn, arc); + AccumuloSelectivityEvalDAO accc = new AccumuloSelectivityEvalDAO(); + accc.setConf(arc); + accc.setConnector(conn); + accc.setRdfEvalDAO(res); + accc.init(); + + BatchWriter bw1 = conn.createBatchWriter("rya_prospects", config); + BatchWriter bw2 = conn.createBatchWriter("rya_selectivity", config); + + String s1 = "predicateobject" + DELIM + "http://www.w3.org/2000/01/rdf-schema#label" + DELIM + "uri:dog"; + String s2 = "predicateobject" + DELIM + "uri:barksAt" + DELIM + "uri:cat"; + String s3 = "predicateobject" + DELIM + "uri:peesOn" + DELIM + "uri:hydrant"; + List<Mutation> mList = new ArrayList<Mutation>(); + List<Mutation> mList2 = new ArrayList<Mutation>(); + List<String> sList = Arrays.asList("subjectobject", "subjectpredicate", "subjectsubject", "predicateobject", "predicatepredicate", + "predicatesubject"); + Mutation m1, m2, m3, m4; + + m1 = new Mutation(s1 + DELIM + "1"); + m1.put(new Text("count"), new Text(""), new Value("1".getBytes())); + m2 = new Mutation(s2 + DELIM + "2"); + m2.put(new Text("count"), new Text(""), new Value("2".getBytes())); + // m3 = new Mutation(s3 + DELIM + "3"); + // m3.put(new Text("count"), new Text(""), new Value("3".getBytes())); + mList.add(m1); + mList.add(m2); + // mList.add(m3); + + bw1.addMutations(mList); + bw1.close(); + +// Scanner scan = conn.createScanner("rya_prospects", new Authorizations()); +// scan.setRange(new Range()); + +// for (Map.Entry<Key, Value> entry : scan) { +// System.out.println("Key row string is " + entry.getKey().getRow().toString()); +// System.out.println("Key is " + entry.getKey()); +// System.out.println("Value is " + (new String(entry.getValue().get()))); +// } + + m1 = new Mutation(s1); + m2 = new Mutation(s2); + m3 = new Mutation(s3); + m4 = new Mutation(new Text("subjectpredicateobject" + DELIM + "FullTableCardinality")); + m4.put(new Text("FullTableCardinality"), new Text("100"), EMPTY_VAL); + int i = 2; + int j = 3; + int k = 4; + Long count1; + Long count2; + Long count3; + + for (String s : sList) { + count1 = (long) i; + count2 = (long) j; + count3 = (long) k; + m1.put(new Text(s), new Text(count1.toString()), EMPTY_VAL); + m2.put(new Text(s), new Text(count2.toString()), EMPTY_VAL); + m3.put(new Text(s), new Text(count3.toString()), EMPTY_VAL); + i = 2 * i; + j = 2 * j; + k = 2 * k; + } + mList2.add(m1); + mList2.add(m2); + mList2.add(m3); + mList2.add(m4); + bw2.addMutations(mList2); + bw2.close(); + +// scan = conn.createScanner("rya_selectivity", new Authorizations()); +// scan.setRange(new Range()); + +// for (Map.Entry<Key, Value> entry : scan) { +// System.out.println("Key row string is " + entry.getKey().getRow().toString()); +// System.out.println("Key is " + entry.getKey()); +// System.out.println("Value is " + (new String(entry.getKey().getColumnQualifier().toString()))); +// +// } + + TupleExpr te = getTupleExpr(q1); + System.out.println(te); + + RdfCloudTripleStoreSelectivityEvaluationStatistics ars = new RdfCloudTripleStoreSelectivityEvaluationStatistics(arc, res, accc); + double card = ars.getCardinality(te); + + Assert.assertEquals(4.04, card, .0001); + + } + + private TupleExpr getTupleExpr(String query) throws MalformedQueryException { + + SPARQLParser sp = new SPARQLParser(); + ParsedQuery pq = sp.parseQuery(query, null); + + return pq.getTupleExpr(); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/java/mvm/rya/triplestore/inference/SameAsTest.java ---------------------------------------------------------------------- diff --git a/sail/src/test/java/mvm/rya/triplestore/inference/SameAsTest.java b/sail/src/test/java/mvm/rya/triplestore/inference/SameAsTest.java new file mode 100644 index 0000000..d214123 --- /dev/null +++ b/sail/src/test/java/mvm/rya/triplestore/inference/SameAsTest.java @@ -0,0 +1,115 @@ +package mvm.rya.triplestore.inference; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + + +import info.aduna.iteration.Iterations; +import junit.framework.TestCase; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.inference.InferenceEngine; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.junit.Test; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; + +public class SameAsTest extends TestCase { + private String user = "user"; + private String pwd = "pwd"; + private String instance = "myinstance"; + private String tablePrefix = "t_"; + private Authorizations auths = Constants.NO_AUTHS; + private Connector connector; + private AccumuloRyaDAO ryaDAO; + private ValueFactory vf = new ValueFactoryImpl(); + private String namespace = "urn:test#"; + private AccumuloRdfConfiguration conf; + + @Override + public void setUp() throws Exception { + super.setUp(); + connector = new MockInstance(instance).getConnector(user, pwd.getBytes()); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + connector.tableOperations().create(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); + SecurityOperations secOps = connector.securityOperations(); + secOps.createUser(user, pwd.getBytes(), auths); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, TablePermission.READ); + secOps.grantTablePermission(user, tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, TablePermission.READ); + + conf = new AccumuloRdfConfiguration(); + ryaDAO = new AccumuloRyaDAO(); + ryaDAO.setConnector(connector); + conf.setTablePrefix(tablePrefix); + ryaDAO.setConf(conf); + ryaDAO.init(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + connector.tableOperations().delete(tablePrefix + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX); + } + + @Test + //This isn't a good test. It's simply a cut-and-paste from a test that was failing in a different package in the SameAsVisitor. + public void testGraphConfiguration() throws Exception { + URI a = vf.createURI(namespace, "a"); + Statement statement = new StatementImpl(a, vf.createURI(namespace, "p"), vf.createLiteral("l")); + Statement statement2 = new StatementImpl(a, vf.createURI(namespace, "p2"), vf.createLiteral("l")); + ryaDAO.add(RdfToRyaConversions.convertStatement(statement)); + ryaDAO.add(RdfToRyaConversions.convertStatement(statement2)); + ryaDAO.add(RdfToRyaConversions.convertStatement(new StatementImpl(vf.createURI(namespace, "b"), vf.createURI(namespace, "p"), vf.createLiteral("l")))); + ryaDAO.add(RdfToRyaConversions.convertStatement(new StatementImpl(vf.createURI(namespace, "c"), vf.createURI(namespace, "n"), vf.createLiteral("l")))); + + // build a connection + RdfCloudTripleStore store = new RdfCloudTripleStore(); + store.setConf(conf); + store.setRyaDAO(ryaDAO); + + InferenceEngine inferenceEngine = new InferenceEngine(); + inferenceEngine.setRyaDAO(ryaDAO); + store.setInferenceEngine(inferenceEngine); + + store.initialize(); + + System.out.println(Iterations.asList(store.getConnection().getStatements(a, vf.createURI(namespace, "p"), vf.createLiteral("l"), false, new Resource[0])).size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/resources/cdrdf.xml ---------------------------------------------------------------------- diff --git a/sail/src/test/resources/cdrdf.xml b/sail/src/test/resources/cdrdf.xml new file mode 100644 index 0000000..cd02ed2 --- /dev/null +++ b/sail/src/test/resources/cdrdf.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:cd="http://www.recshop.fake/cd#"> + + <rdf:Description rdf:about="http://www.recshop.fake/cd/Empire_Burlesque"> + <cd:artist>Bob Dylan</cd:artist> + <cd:country>USA</cd:country> + <cd:company>Columbia</cd:company> + <cd:price>10.90</cd:price> + <cd:year>1985</cd:year> + </rdf:Description> + + <rdf:Description rdf:about="http://www.recshop.fake/cd/Hide_your_fingers"> + <cd:artist>Bonnie Tyler</cd:artist> + <cd:country>UK</cd:country> + <cd:company>CBS Records</cd:company> + <cd:price>9.90</cd:price> + <cd:year>1993</cd:year> + </rdf:Description> +</rdf:RDF> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/resources/namedgraphs.trig ---------------------------------------------------------------------- diff --git a/sail/src/test/resources/namedgraphs.trig b/sail/src/test/resources/namedgraphs.trig new file mode 100644 index 0000000..748d276 --- /dev/null +++ b/sail/src/test/resources/namedgraphs.trig @@ -0,0 +1,37 @@ +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . +@prefix swp: <http://www.w3.org/2004/03/trix/swp-1/> . +@prefix dc: <http://purl.org/dc/elements/1.1/> . +@prefix ex: <http://www.example.org/vocabulary#> . +@prefix : <http://www.example.org/exampleDocument#> . +:G1 { :Monica ex:name "Monica Murphy" . + :Monica ex:homepage <http://www.monicamurphy.org> . + :Monica ex:email <mailto:[email protected]> . + :Monica ex:one <mailto:[email protected]> . + :Monica ex:two <mailto:[email protected]> . + :Monica ex:three <mailto:[email protected]> . + :Monica ex:four <mailto:[email protected]> . + :Monica ex:five <mailto:[email protected]> . + :Monica ex:six <mailto:[email protected]> . + :Monica ex:seven <mailto:[email protected]> . + :Monica ex:eight <mailto:[email protected]> . + :Monica ex:nine <mailto:[email protected]> . + :Monica ex:ten <mailto:[email protected]> . + :Monica ex:hasSkill ex:Management } + +:G2 { :Monica rdf:type ex:Person . + :Monica ex:hasSkill ex:Programming } + +:G4 { :Phobe ex:name "Phobe Buffet" } + +:G3 { :G1 swp:assertedBy _:w1 . + _:w1 swp:authority :Chris . + _:w1 dc:date "2003-10-02"^^xsd:date . + :G2 swp:quotedBy _:w2 . + :G4 swp:assertedBy _:w2 . + _:w2 dc:date "2003-09-03"^^xsd:date . + _:w2 swp:authority :Tom . + :Chris rdf:type ex:Person . + :Chris ex:email <mailto:[email protected]>. + :Tom rdf:type ex:Person . + :Tom ex:email <mailto:[email protected]>} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/resources/ntriples.nt ---------------------------------------------------------------------- diff --git a/sail/src/test/resources/ntriples.nt b/sail/src/test/resources/ntriples.nt new file mode 100644 index 0000000..edf1190 --- /dev/null +++ b/sail/src/test/resources/ntriples.nt @@ -0,0 +1 @@ +<urn:lubm:rdfts#GraduateStudent> <http://www.w3.org/2000/01/rdf-schema#subPropertyOf> <urn:lubm:rdfts#Student> . \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/resources/reification.xml ---------------------------------------------------------------------- diff --git a/sail/src/test/resources/reification.xml b/sail/src/test/resources/reification.xml new file mode 100644 index 0000000..5ab7722 --- /dev/null +++ b/sail/src/test/resources/reification.xml @@ -0,0 +1,36 @@ +<?xml version="1.0"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:cd="http://www.recshop.fake/cd#" + xmlns:mm="http://mvm.com/owl/2010/10/mm.owl#"> + + <rdf:Description rdf:nodeID="A4"> + <rdf:subject + rdf:resource="http://mvm.com/owl/2010/10/mm.owl#urn:mm:mvm:root/cimv2:Linux_Processor:0:CIM_ComputerSystem:nimbus02.bullpen.net"/> + <rdf:predicate rdf:resource="http://mvm.com/owl/2010/10/mm.owl#loadPercentage"/> + <rdf:object rdf:datatype="http://www.w3.org/2001/XMLSchema#int">1</rdf:object> + <rdf:type rdf:resource="http://www.w3.org/1999/02/22-rdf-syntax-ns#Statement"/> + <mm:reportedAt rdf:datatype="http://www.w3.org/2001/XMLSchema#dateTime">2011-01-07T21:29:45.545Z</mm:reportedAt> + </rdf:Description> + +</rdf:RDF> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/sail/src/test/resources/univ-bench.owl ---------------------------------------------------------------------- diff --git a/sail/src/test/resources/univ-bench.owl b/sail/src/test/resources/univ-bench.owl new file mode 100644 index 0000000..691a330 --- /dev/null +++ b/sail/src/test/resources/univ-bench.owl @@ -0,0 +1,466 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<rdf:RDF + xmlns="urn:lubm:rdfts#" + xml:base="urn:lubm:rdfts#" + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#" + xmlns:owl="http://www.w3.org/2002/07/owl#" +> + +<owl:Class rdf:ID="AdministrativeStaff"> + <rdfs:label>administrative staff worker</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Employee" /> +</owl:Class> + +<owl:Class rdf:ID="Article"> + <rdfs:label>article</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="AssistantProfessor"> + <rdfs:label>assistant professor</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="AssociateProfessor"> + <rdfs:label>associate professor</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="Book"> + <rdfs:label>book</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="Chair"> + <rdfs:label>chair</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#headOf" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#Department" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="ClericalStaff"> + <rdfs:label>clerical staff worker</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#AdministrativeStaff" /> +</owl:Class> + +<owl:Class rdf:ID="College"> + <rdfs:label>school</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="ConferencePaper"> + <rdfs:label>conference paper</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Article" /> +</owl:Class> + +<owl:Class rdf:ID="Course"> + <rdfs:label>teaching course</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Work" /> +</owl:Class> + +<owl:Class rdf:ID="Dean"> + <rdfs:label>dean</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#headOf" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#College" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="Department"> + <rdfs:label>university department</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="Director"> + <rdfs:label>director</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#headOf" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#Program" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> +</owl:Class> + +<owl:Class rdf:ID="Employee"> + <rdfs:label>Employee</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#worksFor" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#Organization" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> +</owl:Class> + +<owl:Class rdf:ID="Faculty"> + <rdfs:label>faculty member</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Employee" /> +</owl:Class> + +<owl:Class rdf:ID="FullProfessor"> + <rdfs:label>full professor</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="GraduateCourse"> + <rdfs:label>Graduate Level Courses</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Course" /> +</owl:Class> + +<owl:Class rdf:ID="GraduateStudent"> + <rdfs:label>graduate student</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Person" /> + <rdfs:subPropertyOf> + <owl:Restriction> + <owl:onProperty rdf:resource="#takesCourse" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#GraduateCourse" /> + </owl:someValuesFrom> + </owl:Restriction> + </rdfs:subPropertyOf> +</owl:Class> + +<owl:Class rdf:ID="Institute"> + <rdfs:label>institute</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="JournalArticle"> + <rdfs:label>journal article</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Article" /> +</owl:Class> + +<owl:Class rdf:ID="Lecturer"> + <rdfs:label>lecturer</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Faculty" /> +</owl:Class> + +<owl:Class rdf:ID="Manual"> + <rdfs:label>manual</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="Organization"> + <rdfs:label>organization</rdfs:label> +</owl:Class> + +<owl:Class rdf:ID="Person"> + <rdfs:label>person</rdfs:label> +</owl:Class> + +<owl:Class rdf:ID="PostDoc"> + <rdfs:label>post doctorate</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Faculty" /> +</owl:Class> + +<owl:Class rdf:ID="Professor"> + <rdfs:label>professor</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Faculty" /> +</owl:Class> + +<owl:Class rdf:ID="Program"> + <rdfs:label>program</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="Publication"> + <rdfs:label>publication</rdfs:label> +</owl:Class> + +<owl:Class rdf:ID="Research"> + <rdfs:label>research work</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Work" /> +</owl:Class> + +<owl:Class rdf:ID="ResearchAssistant"> + <rdfs:label>university research assistant</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Student" /> + <rdfs:subPropertyOf> + <owl:Restriction> + <owl:onProperty rdf:resource="#worksFor" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#ResearchGroup" /> + </owl:someValuesFrom> + </owl:Restriction> + </rdfs:subPropertyOf> +</owl:Class> + +<owl:Class rdf:ID="ResearchGroup"> + <rdfs:label>research group</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="Schedule"> + <rdfs:label>schedule</rdfs:label> +</owl:Class> + +<owl:Class rdf:ID="Software"> + <rdfs:label>software program</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="Specification"> + <rdfs:label>published specification</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="Student"> + <rdfs:label>student</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#takesCourse" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#Course" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> +</owl:Class> + +<owl:Class rdf:ID="SystemsStaff"> + <rdfs:label>systems staff worker</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#AdministrativeStaff" /> +</owl:Class> + +<owl:Class rdf:ID="TeachingAssistant"> + <rdfs:label>university teaching assistant</rdfs:label> + <owl:intersectionOf rdf:parseType="Collection"> + <owl:Class rdf:about="#Person" /> + <owl:Restriction> + <owl:onProperty rdf:resource="#teachingAssistantOf" /> + <owl:someValuesFrom> + <owl:Class rdf:about="#Course" /> + </owl:someValuesFrom> + </owl:Restriction> + </owl:intersectionOf> +</owl:Class> + +<owl:Class rdf:ID="TechnicalReport"> + <rdfs:label>technical report</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Article" /> +</owl:Class> + +<owl:Class rdf:ID="UndergraduateStudent"> + <rdfs:label>undergraduate student</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Student" /> +</owl:Class> + +<owl:Class rdf:ID="University"> + <rdfs:label>university</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Organization" /> +</owl:Class> + +<owl:Class rdf:ID="UnofficialPublication"> + <rdfs:label>unnoficial publication</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Publication" /> +</owl:Class> + +<owl:Class rdf:ID="VisitingProfessor"> + <rdfs:label>visiting professor</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#Professor" /> +</owl:Class> + +<owl:Class rdf:ID="Work"> + <rdfs:label>Work</rdfs:label> +</owl:Class> + +<owl:ObjectProperty rdf:ID="advisor"> + <rdfs:label>is being advised by</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> + <rdfs:range rdf:resource="#Professor" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="affiliatedOrganizationOf"> + <rdfs:label>is affiliated with</rdfs:label> + <rdfs:domain rdf:resource="#Organization" /> + <rdfs:range rdf:resource="#Organization" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="affiliateOf"> + <rdfs:label>is affiliated with</rdfs:label> + <rdfs:domain rdf:resource="#Organization" /> + <rdfs:range rdf:resource="#Person" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="age"> + <rdfs:label>is age</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="degreeFrom"> + <rdfs:label>has a degree from</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> + <rdfs:range rdf:resource="#University" /> + <owl:inverseOf rdf:resource="#hasAlumnus"/> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="doctoralDegreeFrom"> + <rdfs:label>has a doctoral degree from</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> + <rdfs:range rdf:resource="#University" /> + <rdfs:subPropertyOf rdf:resource="#degreeFrom" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="emailAddress"> + <rdfs:label>can be reached at</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="hasAlumnus"> + <rdfs:label>has as an alumnus</rdfs:label> + <rdfs:domain rdf:resource="#University" /> + <rdfs:range rdf:resource="#Person" /> + <owl:inverseOf rdf:resource="#degreeFrom"/> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="headOf"> + <rdfs:label>is the head of</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#worksFor"/> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="listedCourse"> + <rdfs:label>lists as a course</rdfs:label> + <rdfs:domain rdf:resource="#Schedule" /> + <rdfs:range rdf:resource="#Course" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="mastersDegreeFrom"> + <rdfs:label>has a masters degree from</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> + <rdfs:range rdf:resource="#University" /> + <rdfs:subPropertyOf rdf:resource="#degreeFrom"/> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="member"> + <rdfs:label>has as a member</rdfs:label> + <rdfs:domain rdf:resource="#Organization" /> + <rdfs:range rdf:resource="#Person" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="memberOf"> +<rdfs:label>member of</rdfs:label> +<owl:inverseOf rdf:resource="#member" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="name"> +<rdfs:label>name</rdfs:label> +</owl:DatatypeProperty> + +<owl:DatatypeProperty rdf:ID="officeNumber"> + <rdfs:label>office room No.</rdfs:label> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="orgPublication"> + <rdfs:label>publishes</rdfs:label> + <rdfs:domain rdf:resource="#Organization" /> + <rdfs:range rdf:resource="#Publication" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="publicationAuthor"> + <rdfs:label>was written by</rdfs:label> + <rdfs:domain rdf:resource="#Publication" /> + <rdfs:range rdf:resource="#Person" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="publicationDate"> + <rdfs:label>was written on</rdfs:label> + <rdfs:domain rdf:resource="#Publication" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="publicationResearch"> + <rdfs:label>is about</rdfs:label> + <rdfs:domain rdf:resource="#Publication" /> + <rdfs:range rdf:resource="#Research" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="researchInterest"> + <rdfs:label>is researching</rdfs:label> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="researchProject"> + <rdfs:label>has as a research project</rdfs:label> + <rdfs:domain rdf:resource="#ResearchGroup" /> + <rdfs:range rdf:resource="#Research" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="softwareDocumentation"> + <rdfs:label>is documented in</rdfs:label> + <rdfs:domain rdf:resource="#Software" /> + <rdfs:range rdf:resource="#Publication" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="softwareVersion"> + <rdfs:label>is version</rdfs:label> + <rdfs:domain rdf:resource="#Software" /> +</owl:ObjectProperty> + +<owl:TransitiveProperty rdf:ID="subOrganizationOf"> + <rdfs:label>is part of</rdfs:label> + <rdfs:domain rdf:resource="#Organization" /> + <rdfs:range rdf:resource="#Organization" /> +</owl:TransitiveProperty> + +<owl:ObjectProperty rdf:ID="takesCourse"> + <rdfs:label>is taking</rdfs:label> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="teacherOf"> + <rdfs:label>teaches</rdfs:label> + <rdfs:domain rdf:resource="#Faculty" /> + <rdfs:range rdf:resource="#Course" /> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="teachingAssistantOf"> + <rdfs:label>is a teaching assistant for</rdfs:label> + <rdfs:domain rdf:resource="#TeachingAssistant" /> + <rdfs:range rdf:resource="#Course" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="telephone"> + <rdfs:label>telephone number</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="tenured"> + <rdfs:label>is tenured:</rdfs:label> + <rdfs:domain rdf:resource="#Professor" /> +</owl:ObjectProperty> + +<owl:DatatypeProperty rdf:ID="title"> + <rdfs:label>title</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> +</owl:DatatypeProperty> + +<owl:ObjectProperty rdf:ID="undergraduateDegreeFrom"> + <rdfs:label>has an undergraduate degree from</rdfs:label> + <rdfs:domain rdf:resource="#Person" /> + <rdfs:range rdf:resource="#University" /> + <rdfs:subPropertyOf rdf:resource="#degreeFrom"/> +</owl:ObjectProperty> + +<owl:ObjectProperty rdf:ID="worksFor"> + <rdfs:label>Works For</rdfs:label> + <rdfs:subPropertyOf rdf:resource="#memberOf" /> +</owl:ObjectProperty> + +</rdf:RDF> + http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/utils/cloudbase.utils/pom.xml ---------------------------------------------------------------------- diff --git a/utils/cloudbase.utils/pom.xml b/utils/cloudbase.utils/pom.xml deleted file mode 100644 index 371d71d..0000000 --- a/utils/cloudbase.utils/pom.xml +++ /dev/null @@ -1,67 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>mvm.rya</groupId> - <artifactId>rya.utils</artifactId> - <version>3.2.10-SNAPSHOT</version> - </parent> - <artifactId>cloudbase.utils</artifactId> - <name>${project.groupId}.${project.artifactId}</name> - <dependencies> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </dependency> - - <!-- Test-scoped dependencies --> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-start</artifactId> - <version>1.3.4</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>thrift</artifactId> - <version>0.3</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-jci-core</artifactId> - <version>1.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-jci-fam</artifactId> - <version>1.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>3.2</version> - <scope>test</scope> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/bulk/KeyRangePartitioner.java ---------------------------------------------------------------------- diff --git a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/bulk/KeyRangePartitioner.java b/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/bulk/KeyRangePartitioner.java deleted file mode 100644 index dcf54b3..0000000 --- a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/bulk/KeyRangePartitioner.java +++ /dev/null @@ -1,35 +0,0 @@ -package mvm.rya.cloudbase.utils.bulk; - -import cloudbase.core.client.mapreduce.lib.partition.RangePartitioner; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Partitioner; - -/** - * Class KeyRangePartitioner - * Date: Sep 13, 2011 - * Time: 2:45:56 PM - */ -public class KeyRangePartitioner extends Partitioner<Key, Value> implements Configurable { - - private RangePartitioner rangePartitioner = new RangePartitioner(); - private Configuration conf; - - public Configuration getConf() { - return conf; - } - - public void setConf(Configuration conf) { - this.conf = conf; - rangePartitioner.setConf(conf); - } - - @Override - public int getPartition(Key key, Value value, int numReducers) { - return rangePartitioner.getPartition(key.getRow(), value, numReducers); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/filters/TimeRangeFilter.java ---------------------------------------------------------------------- diff --git a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/filters/TimeRangeFilter.java b/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/filters/TimeRangeFilter.java deleted file mode 100644 index 965aa7f..0000000 --- a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/filters/TimeRangeFilter.java +++ /dev/null @@ -1,64 +0,0 @@ -package mvm.rya.cloudbase.utils.filters; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.OptionDescriber; -import cloudbase.core.iterators.filter.Filter; - -import java.util.Map; -import java.util.TreeMap; - -/** - * Set the startTime and timeRange. The filter will only keyValues that - * are within the range [startTime - timeRange, startTime]. - * - * @deprecated Use the LimitingAgeOffFilter - */ -public class TimeRangeFilter implements Filter, OptionDescriber { - private long timeRange; - private long startTime; - public static final String TIME_RANGE_PROP = "timeRange"; - public static final String START_TIME_PROP = "startTime"; - - @Override - public boolean accept(Key k, Value v) { - long diff = startTime - k.getTimestamp(); - return !(diff > timeRange || diff < 0); - } - - @Override - public void init(Map<String, String> options) { - if (options == null) { - throw new IllegalArgumentException("options must be set for TimeRangeFilter"); - } - - timeRange = -1; - String timeRange_s = options.get(TIME_RANGE_PROP); - if (timeRange_s == null) - throw new IllegalArgumentException("timeRange must be set for TimeRangeFilter"); - - timeRange = Long.parseLong(timeRange_s); - - String time = options.get(START_TIME_PROP); - if (time != null) - startTime = Long.parseLong(time); - else - startTime = System.currentTimeMillis(); - } - - @Override - public IteratorOptions describeOptions() { - Map<String, String> options = new TreeMap<String, String>(); - options.put(TIME_RANGE_PROP, "time range from the startTime (milliseconds)"); - options.put(START_TIME_PROP, "if set, use the given value as the absolute time in milliseconds as the start time in the time range."); - return new OptionDescriber.IteratorOptions("timeRangeFilter", "TimeRangeFilter removes entries with timestamps outside of the given time range: " + - "[startTime - timeRange, startTime]", - options, null); - } - - @Override - public boolean validateOptions(Map<String, String> options) { - Long.parseLong(options.get(TIME_RANGE_PROP)); - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/input/CloudbaseBatchScannerInputFormat.java ---------------------------------------------------------------------- diff --git a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/input/CloudbaseBatchScannerInputFormat.java b/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/input/CloudbaseBatchScannerInputFormat.java deleted file mode 100644 index b7a1c84..0000000 --- a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/input/CloudbaseBatchScannerInputFormat.java +++ /dev/null @@ -1,872 +0,0 @@ -package mvm.rya.cloudbase.utils.input; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.*; -import cloudbase.core.client.impl.Tables; -import cloudbase.core.client.impl.TabletLocator; -import cloudbase.core.data.*; -import cloudbase.core.security.Authorizations; -import cloudbase.core.security.TablePermission; -import cloudbase.core.security.thrift.AuthInfo; -import cloudbase.core.util.ArgumentChecker; -import cloudbase.core.util.Pair; -import cloudbase.core.util.TextUtil; -import cloudbase.core.util.UtilWaitThread; -import cloudbase.core.util.format.DefaultFormatter; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.*; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -import java.io.*; -import java.lang.reflect.InvocationTargetException; -import java.math.BigInteger; -import java.net.InetAddress; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.util.*; -import java.util.Map.Entry; - -/** - * This class allows MapReduce jobs to use Cloudbase as the source of data. This - * input format provides keys and values of type Key and Value to the Map() and - * Reduce() functions. - * - * The user must specify the following via static methods: - * - * <ul> - * <li>CloudbaseInputFormat.setInputTableInfo(job, username, password, table, - * auths) - * <li>CloudbaseInputFormat.setZooKeeperInstance(job, instanceName, hosts) - * </ul> - * - * Other static methods are optional - */ -public class CloudbaseBatchScannerInputFormat extends InputFormat<Key, Value> { - private static final Logger log = Logger.getLogger(CloudbaseBatchScannerInputFormat.class); - - private static final String PREFIX = CloudbaseBatchScannerInputFormat.class.getSimpleName(); - private static final String INPUT_INFO_HAS_BEEN_SET = PREFIX + ".configured"; - private static final String INSTANCE_HAS_BEEN_SET = PREFIX + ".instanceConfigured"; - private static final String USERNAME = PREFIX + ".username"; - private static final String PASSWORD = PREFIX + ".password"; - private static final String TABLE_NAME = PREFIX + ".tablename"; - private static final String AUTHORIZATIONS = PREFIX + ".authorizations"; - - private static final String INSTANCE_NAME = PREFIX + ".instanceName"; - private static final String ZOOKEEPERS = PREFIX + ".zooKeepers"; - private static final String MOCK = ".useMockInstance"; - - private static final String RANGES = PREFIX + ".ranges"; - private static final String AUTO_ADJUST_RANGES = PREFIX + ".ranges.autoAdjust"; - - private static final String ROW_REGEX = PREFIX + ".regex.row"; - private static final String COLUMN_FAMILY_REGEX = PREFIX + ".regex.cf"; - private static final String COLUMN_QUALIFIER_REGEX = PREFIX + ".regex.cq"; - private static final String VALUE_REGEX = PREFIX + ".regex.value"; - - private static final String COLUMNS = PREFIX + ".columns"; - private static final String LOGLEVEL = PREFIX + ".loglevel"; - - private static final String ISOLATED = PREFIX + ".isolated"; - - //Used to specify the maximum # of versions of a Cloudbase cell value to return - private static final String MAX_VERSIONS = PREFIX + ".maxVersions"; - - //Used for specifying the iterators to be applied - private static final String ITERATORS = PREFIX + ".iterators"; - private static final String ITERATORS_OPTIONS = PREFIX + ".iterators.options"; - private static final String ITERATORS_DELIM = ","; - private BatchScanner bScanner; - - /** - * Enable or disable use of the {@link cloudbase.core.client.IsolatedScanner}. By default it is not enabled. - * - * @param job - * @param enable - */ - public static void setIsolated(JobContext job, boolean enable){ - Configuration conf = job.getConfiguration(); - conf.setBoolean(ISOLATED, enable); - } - - public static void setInputInfo(JobContext job, String user, byte[] passwd, String table, Authorizations auths) { - Configuration conf = job.getConfiguration(); - if (conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) - throw new IllegalStateException("Input info can only be set once per job"); - conf.setBoolean(INPUT_INFO_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(user, passwd, table); - conf.set(USERNAME, user); - conf.set(PASSWORD, new String(Base64.encodeBase64(passwd))); - conf.set(TABLE_NAME, table); - if (auths != null && !auths.isEmpty()) - conf.set(AUTHORIZATIONS, auths.serialize()); - } - - public static void setZooKeeperInstance(JobContext job, String instanceName, String zooKeepers) { - Configuration conf = job.getConfiguration(); - if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IllegalStateException("Instance info can only be set once per job"); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - - ArgumentChecker.notNull(instanceName, zooKeepers); - conf.set(INSTANCE_NAME, instanceName); - conf.set(ZOOKEEPERS, zooKeepers); - } - - public static void setMockInstance(JobContext job, String instanceName) { - Configuration conf = job.getConfiguration(); - conf.setBoolean(INSTANCE_HAS_BEEN_SET, true); - conf.setBoolean(MOCK, true); - conf.set(INSTANCE_NAME, instanceName); - } - - public static void setRanges(JobContext job, Collection<Range> ranges) { - ArgumentChecker.notNull(ranges); - ArrayList<String> rangeStrings = new ArrayList<String>(ranges.size()); - try { - for (Range r : ranges) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - r.write(new DataOutputStream(baos)); - rangeStrings.add(new String(Base64.encodeBase64(baos.toByteArray()))); - } - } catch (IOException ex) { - throw new IllegalArgumentException("Unable to encode ranges to Base64", ex); - } - job.getConfiguration().setStrings(RANGES, rangeStrings.toArray(new String[0])); - } - - public static void disableAutoAdjustRanges(JobContext job) { - job.getConfiguration().setBoolean(AUTO_ADJUST_RANGES, false); - } - - public static enum RegexType { - ROW, COLUMN_FAMILY, COLUMN_QUALIFIER, VALUE - } - - public static void setRegex(JobContext job, RegexType type, String regex) { - ArgumentChecker.notNull(type, regex); - String key = null; - switch (type) { - case ROW: - key = ROW_REGEX; - break; - case COLUMN_FAMILY: - key = COLUMN_FAMILY_REGEX; - break; - case COLUMN_QUALIFIER: - key = COLUMN_QUALIFIER_REGEX; - break; - case VALUE: - key = VALUE_REGEX; - break; - default: - throw new NoSuchElementException(); - } - try { - job.getConfiguration().set(key, URLEncoder.encode(regex, "UTF-8")); - } catch (UnsupportedEncodingException e) { - log.error("Failedd to encode regular expression",e); - throw new RuntimeException(e); - } - } - - - - - /** - * Sets the max # of values that may be returned for an individual Cloudbase cell. By default, applied before all other - * Cloudbase iterators (highest priority) leveraged in the scan by the record reader. To adjust priority use - * setIterator() & setIteratorOptions() w/ the VersioningIterator type explicitly. - * - * @param job the job - * @param maxVersions the max versions - * @throws java.io.IOException - */ - public static void setMaxVersions(JobContext job, int maxVersions) throws IOException{ - if (maxVersions < 1) throw new IOException("Invalid maxVersions: " + maxVersions + ". Must be >= 1"); - job.getConfiguration().setInt(MAX_VERSIONS, maxVersions); - } - - /** - * - * @param columnFamilyColumnQualifierPairs - * A pair of {@link org.apache.hadoop.io.Text} objects corresponding to column family - * and column qualifier. If the column qualifier is null, the - * entire column family is selected. An empty set is the default - * and is equivalent to scanning the all columns. - */ - public static void fetchColumns(JobContext job, Collection<Pair<Text, Text>> columnFamilyColumnQualifierPairs) { - ArgumentChecker.notNull(columnFamilyColumnQualifierPairs); - ArrayList<String> columnStrings = new ArrayList<String>(columnFamilyColumnQualifierPairs.size()); - for (Pair<Text, Text> column : columnFamilyColumnQualifierPairs) { - if(column.getFirst() == null) - throw new IllegalArgumentException("Column family can not be null"); - - String col = new String(Base64.encodeBase64(TextUtil.getBytes(column.getFirst()))); - if (column.getSecond() != null) - col += ":" + new String(Base64.encodeBase64(TextUtil.getBytes(column.getSecond()))); - columnStrings.add(col); - } - job.getConfiguration().setStrings(COLUMNS, columnStrings.toArray(new String[0])); - } -// -// public static void setLogLevel(JobContext job, Level level) { -// ArgumentChecker.notNull(level); -// log.setLevel(level); -// job.getConfiguration().setInt(LOGLEVEL, level.toInt()); -// } - - - /** - * Specify a Cloudbase iterator type to manage the behavior of the underlying table scan this InputFormat's Record Reader will conduct, w/ priority dictating the order - * in which specified iterators are applied. Repeat calls to specify multiple iterators are allowed. - * - * @param job the job - * @param priority the priority - * @param iteratorClass the iterator class - * @param iteratorName the iterator name - */ - public static void setIterator(JobContext job, int priority, String iteratorClass, String iteratorName){ - //First check to see if anything has been set already - String iterators = job.getConfiguration().get(ITERATORS); - - //No iterators specified yet, create a new string - if (iterators == null || iterators.isEmpty()) { - iterators = new CBIterator(priority, iteratorClass, iteratorName).toString(); - } - else { - //append the next iterator & reset - iterators = iterators.concat(ITERATORS_DELIM + new CBIterator(priority, iteratorClass, iteratorName).toString()); - } - //Store the iterators w/ the job - job.getConfiguration().set(ITERATORS, iterators); - } - - - /** - * Specify an option for a named Cloudbase iterator, further specifying that iterator's - * behavior. - * - * @param job the job - * @param iteratorName the iterator name. Should correspond to an iterator set w/ a prior setIterator call. - * @param key the key - * @param value the value - */ - public static void setIteratorOption(JobContext job, String iteratorName, String key, String value){ - if (value == null) return; - - String iteratorOptions = job.getConfiguration().get(ITERATORS_OPTIONS); - - //No options specified yet, create a new string - if (iteratorOptions == null || iteratorOptions.isEmpty()){ - iteratorOptions = new CBIteratorOption(iteratorName, key, value).toString(); - } - else { - //append the next option & reset - iteratorOptions = iteratorOptions.concat(ITERATORS_DELIM + new CBIteratorOption(iteratorName, key, value)); - } - - //Store the options w/ the job - job.getConfiguration().set(ITERATORS_OPTIONS, iteratorOptions); - } - - protected static boolean isIsolated(JobContext job){ - return job.getConfiguration().getBoolean(ISOLATED, false); - } - - protected static String getUsername(JobContext job) { - return job.getConfiguration().get(USERNAME); - } - - - /** - * WARNING: The password is stored in the Configuration and shared with all - * MapReduce tasks; It is BASE64 encoded to provide a charset safe - * conversion to a string, and is not intended to be secure. - */ - protected static byte[] getPassword(JobContext job) { - return Base64.decodeBase64(job.getConfiguration().get(PASSWORD, "").getBytes()); - } - - protected static String getTablename(JobContext job) { - return job.getConfiguration().get(TABLE_NAME); - } - - protected static Authorizations getAuthorizations(JobContext job) { - String authString = job.getConfiguration().get(AUTHORIZATIONS); - return authString == null ? CBConstants.NO_AUTHS : new Authorizations(authString.split(",")); - } - - protected static Instance getInstance(JobContext job) { - Configuration conf = job.getConfiguration(); -// if (conf.getBoolean(MOCK, false)) -// return new MockInstance(conf.get(INSTANCE_NAME)); - return new ZooKeeperInstance(conf.get(INSTANCE_NAME), conf.get(ZOOKEEPERS)); - } - - protected static TabletLocator getTabletLocator(JobContext job) throws TableNotFoundException { -// if (job.getConfiguration().getBoolean(MOCK, false)) -// return new MockTabletLocator(); - Instance instance = getInstance(job); - String username = getUsername(job); - byte[] password = getPassword(job); - String tableName = getTablename(job); - return TabletLocator.getInstance(instance, new AuthInfo(username, password, instance.getInstanceID()), new Text(Tables.getTableId(instance, tableName))); - } - - protected static List<Range> getRanges(JobContext job) throws IOException { - ArrayList<Range> ranges = new ArrayList<Range>(); - for (String rangeString : job.getConfiguration().getStringCollection(RANGES)) { - ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(rangeString.getBytes())); - Range range = new Range(); - range.readFields(new DataInputStream(bais)); - ranges.add(range); - } - return ranges; - } - - protected static String getRegex(JobContext job, RegexType type) { - String key = null; - switch (type) { - case ROW: - key = ROW_REGEX; - break; - case COLUMN_FAMILY: - key = COLUMN_FAMILY_REGEX; - break; - case COLUMN_QUALIFIER: - key = COLUMN_QUALIFIER_REGEX; - break; - case VALUE: - key = VALUE_REGEX; - break; - default: - throw new NoSuchElementException(); - } - try { - String s = job.getConfiguration().get(key); - if(s == null) - return null; - return URLDecoder.decode(s,"UTF-8"); - } catch (UnsupportedEncodingException e) { - log.error("Failed to decode regular expression", e); - throw new RuntimeException(e); - } - } - - protected static Set<Pair<Text, Text>> getFetchedColumns(JobContext job) { - Set<Pair<Text, Text>> columns = new HashSet<Pair<Text, Text>>(); - for (String col : job.getConfiguration().getStringCollection(COLUMNS)) { - int idx = col.indexOf(":"); - Text cf = new Text(idx < 0 ? Base64.decodeBase64(col.getBytes()) : Base64.decodeBase64(col.substring(0, idx).getBytes())); - Text cq = idx < 0 ? null : new Text(Base64.decodeBase64(col.substring(idx + 1).getBytes())); - columns.add(new Pair<Text, Text>(cf, cq)); - } - return columns; - } - - protected static boolean getAutoAdjustRanges(JobContext job) { - return job.getConfiguration().getBoolean(AUTO_ADJUST_RANGES, true); - } - - protected static Level getLogLevel(JobContext job) { - return Level.toLevel(job.getConfiguration().getInt(LOGLEVEL, Level.INFO.toInt())); - } - - // InputFormat doesn't have the equivalent of OutputFormat's - // checkOutputSpecs(JobContext job) - protected static void validateOptions(JobContext job) throws IOException { - Configuration conf = job.getConfiguration(); - if (!conf.getBoolean(INPUT_INFO_HAS_BEEN_SET, false)) - throw new IOException("Input info has not been set."); - if (!conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) - throw new IOException("Instance info has not been set."); - // validate that we can connect as configured - try { - Connector c = getInstance(job).getConnector(getUsername(job), getPassword(job)); - if (!c.securityOperations().authenticateUser(getUsername(job), getPassword(job))) - throw new IOException("Unable to authenticate user"); - if (!c.securityOperations().hasTablePermission(getUsername(job), getTablename(job), TablePermission.READ)) - throw new IOException("Unable to access table"); - } catch (CBException e) { - throw new IOException(e); - } catch (CBSecurityException e) { - throw new IOException(e); - } - } - - //Get the maxVersions the VersionsIterator should be configured with. Return -1 if none. - protected static int getMaxVersions(JobContext job) { - return job.getConfiguration().getInt(MAX_VERSIONS, -1); - } - - - //Return a list of the iterator settings (for iterators to apply to a scanner) - protected static List<CBIterator> getIterators(JobContext job){ - - String iterators = job.getConfiguration().get(ITERATORS); - - //If no iterators are present, return an empty list - if (iterators == null || iterators.isEmpty()) return new ArrayList<CBIterator>(); - - //Compose the set of iterators encoded in the job configuration - StringTokenizer tokens = new StringTokenizer(job.getConfiguration().get(ITERATORS),ITERATORS_DELIM); - List<CBIterator> list = new ArrayList<CBIterator>(); - while(tokens.hasMoreTokens()){ - String itstring = tokens.nextToken(); - list.add(new CBIterator(itstring)); - } - return list; - } - - - //Return a list of the iterator options specified - protected static List<CBIteratorOption> getIteratorOptions(JobContext job){ - String iteratorOptions = job.getConfiguration().get(ITERATORS_OPTIONS); - - //If no options are present, return an empty list - if (iteratorOptions == null || iteratorOptions.isEmpty()) return new ArrayList<CBIteratorOption>(); - - //Compose the set of options encoded in the job configuration - StringTokenizer tokens = new StringTokenizer(job.getConfiguration().get(ITERATORS_OPTIONS), ITERATORS_DELIM); - List<CBIteratorOption> list = new ArrayList<CBIteratorOption>(); - while (tokens.hasMoreTokens()){ - String optionString = tokens.nextToken(); - list.add(new CBIteratorOption(optionString)); - } - return list; - } - - - - - @Override - public RecordReader<Key, Value> createRecordReader(InputSplit inSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { -// log.setLevel(getLogLevel(attempt)); - return new RecordReader<Key, Value>() { - private int recordsRead; - private Iterator<Entry<Key, Value>> scannerIterator; - private boolean scannerRegexEnabled = false; - private RangeInputSplit split; - - private void checkAndEnableRegex(String regex, BatchScanner scanner, String CBIMethodName) throws IllegalArgumentException, SecurityException, IllegalAccessException, InvocationTargetException, NoSuchMethodException, IOException { - if (regex != null) { - if (scannerRegexEnabled == false) { - scanner.setupRegex(PREFIX + ".regex.iterator", 50); - scannerRegexEnabled = true; - } - scanner.getClass().getMethod(CBIMethodName, String.class).invoke(scanner, regex); - log.info("Setting " + CBIMethodName + " to " + regex); - } - } - - private boolean setupRegex(TaskAttemptContext attempt, BatchScanner scanner) throws CBException { - try { - checkAndEnableRegex(getRegex(attempt, RegexType.ROW), scanner, "setRowRegex"); - checkAndEnableRegex(getRegex(attempt, RegexType.COLUMN_FAMILY), scanner, "setColumnFamilyRegex"); - checkAndEnableRegex(getRegex(attempt, RegexType.COLUMN_QUALIFIER), scanner, "setColumnQualifierRegex"); - checkAndEnableRegex(getRegex(attempt, RegexType.VALUE), scanner, "setValueRegex"); - return true; - } catch (Exception e) { - throw new CBException("Can't set up regex for scanner"); - } - } - - //Apply the configured iterators from the job to the scanner - private void setupIterators(TaskAttemptContext attempt, BatchScanner scanner) throws CBException { - List<CBIterator> iterators = getIterators(attempt); - List<CBIteratorOption> options = getIteratorOptions(attempt); - - //Loop through the iterators & options, wiring them up to the scanner. - try { - for(CBIterator iterator: iterators){ - scanner.setScanIterators(iterator.getPriority(), iterator.getIteratorClass(), iterator.getIteratorName()); - } - for (CBIteratorOption option: options){ - scanner.setScanIteratorOption(option.getIteratorName(), option.getKey(), option.getValue()); - } - } - catch (Exception e) { - throw new CBException(e); - } - } - - //Apply the VersioningIterator at priority 0 based on the job config - private void setupMaxVersions(TaskAttemptContext attempt, BatchScanner scanner) throws CBException { - int maxVersions = getMaxVersions(attempt); - //Check to make sure its a legit value - if (maxVersions >= 1) { - try { - scanner.setScanIterators(0, cloudbase.core.iterators.VersioningIterator.class.getName(), "vers"); - } - catch (Exception e){ - throw new CBException(e); - } - scanner.setScanIteratorOption("vers", "maxVersions", new Integer(maxVersions).toString()); - } - } - - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { - split = (RangeInputSplit) inSplit; - log.debug("Initializing input split: " + split.range); - Instance instance = getInstance(attempt); - String user = getUsername(attempt); - byte[] password = getPassword(attempt); - Authorizations authorizations = getAuthorizations(attempt); - - try { - log.debug("Creating connector with user: " + user); - Connector conn = instance.getConnector(user, password); - log.debug("Creating scanner for table: " + getTablename(attempt)); - log.debug("Authorizations are: " + authorizations); - bScanner = conn.createBatchScanner(getTablename(attempt), authorizations, 10); -// if(isIsolated(attempt)){ -// log.info("Creating isolated scanner"); -// bScanner = new IsolatedScanner(bScanner); -// } - setupMaxVersions(attempt, bScanner); - setupRegex(attempt, bScanner); - setupIterators(attempt, bScanner); - } catch (Exception e) { - throw new IOException(e); - } - - // setup a scanner within the bounds of this split - for (Pair<Text, Text> c : getFetchedColumns(attempt)) { - if (c.getSecond() != null) - bScanner.fetchColumn(c.getFirst(), c.getSecond()); - else - bScanner.fetchColumnFamily(c.getFirst()); - } - - bScanner.setRanges(Collections.singleton(split.range)); - - recordsRead = 0; - - // do this last after setting all scanner options - scannerIterator = bScanner.iterator(); - } - - public void close() { - bScanner.close(); - } - - public float getProgress() throws IOException { - if(recordsRead > 0 && currentKey == null) - return 1.0f; - return split.getProgress(currentKey); - } - - private Key currentKey = null; - private Value currentValue = null; - - @Override - public Key getCurrentKey() throws IOException, InterruptedException { - return currentKey; - } - - @Override - public Value getCurrentValue() throws IOException, InterruptedException { - return currentValue; - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (scannerIterator.hasNext()) { - ++recordsRead; - Entry<Key, Value> entry = scannerIterator.next(); - currentKey = entry.getKey(); - currentValue = entry.getValue(); - if (log.isTraceEnabled()) - log.trace("Processing key/value pair: " + DefaultFormatter.formatEntry(entry, true)); - return true; - } - return false; - } - }; - } - - /** - * read the metadata table to get tablets of interest these each become a - * split - */ - public List<InputSplit> getSplits(JobContext job) throws IOException { -// log.setLevel(getLogLevel(job)); - validateOptions(job); - - String tableName = getTablename(job); - boolean autoAdjust = getAutoAdjustRanges(job); - List<Range> ranges = autoAdjust ? Range.mergeOverlapping(getRanges(job)) : getRanges(job); - - if (ranges.isEmpty()) { - ranges = new ArrayList<Range>(1); - ranges.add(new Range()); - } - - // get the metadata information for these ranges - Map<String, Map<KeyExtent, List<Range>>> binnedRanges = new HashMap<String, Map<KeyExtent, List<Range>>>(); - TabletLocator tl; - try { - tl = getTabletLocator(job); - while (!tl.binRanges(ranges, binnedRanges).isEmpty()) { - log.warn("Unable to locate bins for specified ranges. Retrying."); - UtilWaitThread.sleep(100 + (int) (Math.random() * 100)); // sleep - // randomly - // between - // 100 - // and - // 200 - // ms - } - } catch (Exception e) { - throw new IOException(e); - } - - ArrayList<InputSplit> splits = new ArrayList<InputSplit>(ranges.size()); - HashMap<Range, ArrayList<String>> splitsToAdd = null; - - if (!autoAdjust) - splitsToAdd = new HashMap<Range, ArrayList<String>>(); - - HashMap<String,String> hostNameCache = new HashMap<String,String>(); - - for (Entry<String, Map<KeyExtent, List<Range>>> tserverBin : binnedRanges.entrySet()) { - String ip = tserverBin.getKey().split(":", 2)[0]; - String location = hostNameCache.get(ip); - if (location == null) { - InetAddress inetAddress = InetAddress.getByName(ip); - location = inetAddress.getHostName(); - hostNameCache.put(ip, location); - } - - for (Entry<KeyExtent, List<Range>> extentRanges : tserverBin.getValue().entrySet()) { - Range ke = extentRanges.getKey().toDataRange(); - for (Range r : extentRanges.getValue()) { - if (autoAdjust) { - // divide ranges into smaller ranges, based on the - // tablets - splits.add(new RangeInputSplit(tableName, ke.clip(r), new String[] { location })); - } else { - // don't divide ranges - ArrayList<String> locations = splitsToAdd.get(r); - if (locations == null) - locations = new ArrayList<String>(1); - locations.add(location); - splitsToAdd.put(r, locations); - } - } - } - } - - if (!autoAdjust) - for (Entry<Range, ArrayList<String>> entry : splitsToAdd.entrySet()) - splits.add(new RangeInputSplit(tableName, entry.getKey(), entry.getValue().toArray(new String[0]))); - return splits; - } - - - - /** - * The Class RangeInputSplit. Encapsulates a Cloudbase range for use in Map Reduce jobs. - */ - public static class RangeInputSplit extends InputSplit implements Writable { - private Range range; - private String[] locations; - - public RangeInputSplit() { - range = new Range(); - locations = new String[0]; - } - - private static byte[] extractBytes(ByteSequence seq, int numBytes) - { - byte [] bytes = new byte[numBytes+1]; - bytes[0] = 0; - for(int i = 0; i < numBytes; i++) - { - if(i >= seq.length()) - bytes[i+1] = 0; - else - bytes[i+1] = seq.byteAt(i); - } - return bytes; - } - - public static float getProgress(ByteSequence start, ByteSequence end, ByteSequence position) - { - int maxDepth = Math.min(Math.max(end.length(),start.length()),position.length()); - BigInteger startBI = new BigInteger(extractBytes(start,maxDepth)); - BigInteger endBI = new BigInteger(extractBytes(end,maxDepth)); - BigInteger positionBI = new BigInteger(extractBytes(position,maxDepth)); - return (float)(positionBI.subtract(startBI).doubleValue() / endBI.subtract(startBI).doubleValue()); - } - - public float getProgress(Key currentKey) { - if(currentKey == null) - return 0f; - if(range.getStartKey() != null && range.getEndKey() != null) - { - if(range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW)!= 0) - { - // just look at the row progress - return getProgress(range.getStartKey().getRowData(),range.getEndKey().getRowData(),currentKey.getRowData()); - } - else if(range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM)!= 0) - { - // just look at the column family progress - return getProgress(range.getStartKey().getColumnFamilyData(),range.getEndKey().getColumnFamilyData(),currentKey.getColumnFamilyData()); - } - else if(range.getStartKey().compareTo(range.getEndKey(), PartialKey.ROW_COLFAM_COLQUAL)!= 0) - { - // just look at the column qualifier progress - return getProgress(range.getStartKey().getColumnQualifierData(),range.getEndKey().getColumnQualifierData(),currentKey.getColumnQualifierData()); - } - } - // if we can't figure it out, then claim no progress - return 0f; - } - - RangeInputSplit(String table, Range range, String[] locations) { - this.range = range; - this.locations = locations; - } - - /** - * @deprecated Since 1.3; Don't use this method to compute any reasonable distance metric.} - */ - @Deprecated - public long getLength() throws IOException { - Text startRow = range.isInfiniteStartKey() ? new Text(new byte[] { Byte.MIN_VALUE }) : range.getStartKey().getRow(); - Text stopRow = range.isInfiniteStopKey() ? new Text(new byte[] { Byte.MAX_VALUE }) : range.getEndKey().getRow(); - int maxCommon = Math.min(7, Math.min(startRow.getLength(), stopRow.getLength())); - long diff = 0; - - byte[] start = startRow.getBytes(); - byte[] stop = stopRow.getBytes(); - for (int i = 0; i < maxCommon; ++i) { - diff |= 0xff & (start[i] ^ stop[i]); - diff <<= Byte.SIZE; - } - - if (startRow.getLength() != stopRow.getLength()) - diff |= 0xff; - - return diff + 1; - } - - public String[] getLocations() throws IOException { - return locations; - } - - public void readFields(DataInput in) throws IOException { - range.readFields(in); - int numLocs = in.readInt(); - locations = new String[numLocs]; - for (int i = 0; i < numLocs; ++i) - locations[i] = in.readUTF(); - } - - public void write(DataOutput out) throws IOException { - range.write(out); - out.writeInt(locations.length); - for (int i = 0; i < locations.length; ++i) - out.writeUTF(locations[i]); - } - } - - /** - * The Class IteratorSetting. Encapsulates specifics for an Cloudbase iterator's name & priority. - */ - static class CBIterator{ - - private static final String FIELD_SEP = ":"; - - private int priority; - private String iteratorClass; - private String iteratorName; - - - public CBIterator (int priority, String iteratorClass, String iteratorName){ - this.priority = priority; - this.iteratorClass = iteratorClass; - this.iteratorName = iteratorName; - } - - //Parses out a setting given an string supplied from an earlier toString() call - public CBIterator (String iteratorSetting){ - //Parse the string to expand the iterator - StringTokenizer tokenizer = new StringTokenizer(iteratorSetting, FIELD_SEP); - priority = Integer.parseInt(tokenizer.nextToken()); - iteratorClass = tokenizer.nextToken(); - iteratorName = tokenizer.nextToken(); - } - - public int getPriority() { - return priority; - } - - public String getIteratorClass() { - return iteratorClass; - } - - public String getIteratorName() { - return iteratorName; - } - - @Override - public String toString(){ - return new String(priority + FIELD_SEP + iteratorClass + FIELD_SEP + iteratorName); - } - - } - - /** - * The Class CBIteratorOption. Encapsulates specifics for a Cloudbase iterator's optional configuration - * details - associated via the iteratorName. - */ - static class CBIteratorOption { - private static final String FIELD_SEP = ":"; - - private String iteratorName; - private String key; - private String value; - - public CBIteratorOption(String iteratorName, String key, String value){ - this.iteratorName = iteratorName; - this.key = key; - this.value = value; - } - - //Parses out an option given a string supplied from an earlier toString() call - public CBIteratorOption(String iteratorOption){ - StringTokenizer tokenizer = new StringTokenizer(iteratorOption, FIELD_SEP); - this.iteratorName = tokenizer.nextToken(); - this.key = tokenizer.nextToken(); - this.value = tokenizer.nextToken(); - } - - public String getIteratorName() { - return iteratorName; - } - - public String getKey() { - return key; - } - - public String getValue() { - return value; - } - - @Override - public String toString() { - return new String(iteratorName + FIELD_SEP + key + FIELD_SEP + value); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/scanner/BatchScannerList.java ---------------------------------------------------------------------- diff --git a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/scanner/BatchScannerList.java b/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/scanner/BatchScannerList.java deleted file mode 100644 index 76b9e22..0000000 --- a/utils/cloudbase.utils/src/main/java/mvm/rya/cloudbase/utils/scanner/BatchScannerList.java +++ /dev/null @@ -1,108 +0,0 @@ -package mvm.rya.cloudbase.utils.scanner; - -import cloudbase.core.client.*; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.util.ArgumentChecker; -import com.google.common.collect.Iterators; -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.util.*; - -/** - * Created by IntelliJ IDEA. - * Date: 4/18/12 - * Time: 11:06 AM - * To change this template use File | Settings | File Templates. - */ -public class BatchScannerList implements BatchScanner{ - private List<BatchScanner> scanners = new ArrayList<BatchScanner>(); - - public BatchScannerList(List<BatchScanner> scanners) { - this.scanners = scanners; - } - - //setRanges - public void setRanges(Collection<Range> ranges) { - ArgumentChecker.notNull(ranges); - for(BatchScanner scanner : scanners) { - scanner.setRanges(ranges); - } - } - - public Iterator<Map.Entry<Key, Value>> iterator() { - List<Iterator<Map.Entry<Key,Value>>> iterators = new ArrayList<Iterator<Map.Entry<Key, Value>>>(); - for(BatchScanner scanner: scanners) { - iterators.add(scanner.iterator()); - } - return Iterators.concat(iterators.toArray(new Iterator[]{})); - } - - public void close() { - for(BatchScanner scanner: scanners) { - scanner.close(); - } - } - - public void setScanIterators(int i, String s, String s1) throws IOException { - for(BatchScanner scanner: scanners) { - scanner.setScanIterators(i, s, s1); - } - } - - public void setScanIteratorOption(String s, String s1, String s2) { - for(BatchScanner scanner: scanners) { - scanner.setScanIteratorOption(s, s1, s2); - } - } - - @Override - public void setupRegex(String s, int i) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setRowRegex(String s) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setColumnFamilyRegex(String s) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setColumnQualifierRegex(String s) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setValueRegex(String s) { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void fetchColumnFamily(Text cf) { - for(BatchScanner scanner: scanners) { - scanner.fetchColumnFamily(cf); - } - } - - public void fetchColumn(Text cf, Text cq) { - for(BatchScanner scanner: scanners) { - scanner.fetchColumn(cf, cq); - } - } - - @Override - public void clearColumns() { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void clearScanIterators() { - //To change body of implemented methods use File | Settings | File Templates. - } - -}
