http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java deleted file mode 100644 index 054146d..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java +++ /dev/null @@ -1,383 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.util.Pair; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.pig.LoadFunc; -import org.apache.pig.OrderedLoadFunc; -import org.apache.pig.ResourceSchema; -import org.apache.pig.StoreFuncInterface; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Accumulo - * <p/> - * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long. - * <p/> - * Tuples can be written in 2 forms: - * (key, colfam, colqual, colvis, value) - * OR - * (key, colfam, colqual, value) - */ -public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, OrderedLoadFunc { - private static final Log logger = LogFactory.getLog(AccumuloStorage.class); - - protected Configuration conf; - protected RecordReader<Key, Value> reader; - protected RecordWriter<Text, Mutation> writer; - - protected String inst; - protected String zookeepers; - protected String user = ""; - protected String password = ""; - protected String table; - protected Text tableName; - protected String auths; - protected Authorizations authorizations = Constants.NO_AUTHS; - protected List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>(); - - protected Collection<Range> ranges = new ArrayList<Range>(); - protected boolean mock = false; - - public AccumuloStorage() { - } - - @Override - public Tuple getNext() throws IOException { - try { - // load the next pair - if (!reader.nextKeyValue()) { - logger.info("Reached end of results"); - return null; - } - - Key key = (Key) reader.getCurrentKey(); - Value value = (Value) reader.getCurrentValue(); - assert key != null && value != null; - - if (logger.isTraceEnabled()) { - logger.trace("Found key[" + key + "] and value[" + value + "]"); - } - - // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(6); - tuple.set(0, new DataByteArray(key.getRow().getBytes())); - tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); - tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); - tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes())); - tuple.set(4, key.getTimestamp()); - tuple.set(5, new DataByteArray(value.get())); - if (logger.isTraceEnabled()) { - logger.trace("Output tuple[" + tuple + "]"); - } - return tuple; - } catch (InterruptedException e) { - throw new IOException(e.getMessage()); - } - } - - @Override - public InputFormat getInputFormat() { - return new AccumuloInputFormat(); - } - - @Override - public void prepareToRead(RecordReader reader, PigSplit split) { - this.reader = reader; - } - - @Override - public void setLocation(String location, Job job) throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]"); - } - conf = job.getConfiguration(); - setLocationFromUri(location, job); - - if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) { - try { - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes())); - } catch (AccumuloSecurityException e) { - throw new RuntimeException(e); - } - AccumuloInputFormat.setInputTableName(job, table); - AccumuloInputFormat.setScanAuthorizations(job, authorizations); - if (!mock) { - AccumuloInputFormat.setZooKeeperInstance(job, inst, zookeepers); - } else { - AccumuloInputFormat.setMockInstance(job, inst); - } - } - if (columnFamilyColumnQualifierPairs.size() > 0) - AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs); - logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " + - "for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]"); - - if (ranges.size() == 0) { - throw new IOException("Accumulo Range must be specified"); - } - AccumuloInputFormat.setRanges(job, ranges); - } - - protected void setLocationFromUri(String uri, Job job) throws IOException { - // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true - try { - if (!uri.startsWith("accumulo://")) - throw new Exception("Bad scheme."); - String[] urlParts = uri.split("\\?"); - setLocationFromUriParts(urlParts); - - } catch (Exception e) { - throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e); - } - } - - protected void setLocationFromUriParts(String[] urlParts) { - String columns = ""; - if (urlParts.length > 1) { - for (String param : urlParts[1].split("&")) { - String[] pair = param.split("="); - if (pair[0].equals("instance")) { - inst = pair[1]; - } else if (pair[0].equals("user")) { - user = pair[1]; - } else if (pair[0].equals("password")) { - password = pair[1]; - } else if (pair[0].equals("zookeepers")) { - zookeepers = pair[1]; - } else if (pair[0].equals("auths")) { - auths = pair[1]; - } else if (pair[0].equals("columns")) { - columns = pair[1]; - } else if (pair[0].equals("range")) { - String[] r = pair[1].split("\\|"); - if (r.length == 2) { - addRange(new Range(r[0], r[1])); - } else { - addRange(new Range(r[0])); - } - } else if (pair[0].equals("mock")) { - this.mock = Boolean.parseBoolean(pair[1]); - } - addLocationFromUriPart(pair); - } - } - String[] parts = urlParts[0].split("/+"); - table = parts[1]; - tableName = new Text(table); - - if (auths == null || auths.equals("")) { - authorizations = new Authorizations(); - } else { - authorizations = new Authorizations(auths.split(",")); - } - - if (!columns.equals("")) { - for (String cfCq : columns.split(",")) { - if (cfCq.contains("|")) { - String[] c = cfCq.split("\\|"); - String cf = c[0]; - String cq = c[1]; - addColumnPair(cf, cq); - } else { - addColumnPair(cfCq, null); - } - } - } - } - - protected void addColumnPair(String cf, String cq) { - columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null)); - } - - protected void addLocationFromUriPart(String[] pair) { - - } - - protected void addRange(Range range) { - ranges.add(range); - } - - @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException { - return location; - } - - @Override - public void setUDFContextSignature(String signature) { - - } - - /* StoreFunc methods */ - public void setStoreFuncUDFContextSignature(String signature) { - - } - - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { - return relativeToAbsolutePath(location, curDir); - } - - public void setStoreLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); - setLocationFromUri(location, job); - - if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) { - try { - AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes())); - } catch (AccumuloSecurityException e) { - new RuntimeException(e); - } - AccumuloOutputFormat.setDefaultTableName(job, table); - AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers); - BatchWriterConfig config = new BatchWriterConfig(); - config.setMaxLatency(10, TimeUnit.SECONDS); - config.setMaxMemory(10 * 1000 * 1000); - config.setMaxWriteThreads(10); - AccumuloOutputFormat.setBatchWriterOptions(job, config); - } - } - - public OutputFormat getOutputFormat() { - return new AccumuloOutputFormat(); - } - - public void checkSchema(ResourceSchema schema) throws IOException { - // we don't care about types, they all get casted to ByteBuffers - } - - public void prepareToWrite(RecordWriter writer) { - this.writer = writer; - } - - public void putNext(Tuple t) throws ExecException, IOException { - Mutation mut = new Mutation(objToText(t.get(0))); - Text cf = objToText(t.get(1)); - Text cq = objToText(t.get(2)); - - if (t.size() > 4) { - Text cv = objToText(t.get(3)); - Value val = new Value(objToBytes(t.get(4))); - if (cv.getLength() == 0) { - mut.put(cf, cq, val); - } else { - mut.put(cf, cq, new ColumnVisibility(cv), val); - } - } else { - Value val = new Value(objToBytes(t.get(3))); - mut.put(cf, cq, val); - } - - try { - writer.write(tableName, mut); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - private static Text objToText(Object o) { - return new Text(objToBytes(o)); - } - - private static byte[] objToBytes(Object o) { - if (o instanceof String) { - String str = (String) o; - return str.getBytes(); - } else if (o instanceof Long) { - Long l = (Long) o; - return l.toString().getBytes(); - } else if (o instanceof Integer) { - Integer l = (Integer) o; - return l.toString().getBytes(); - } else if (o instanceof Boolean) { - Boolean l = (Boolean) o; - return l.toString().getBytes(); - } else if (o instanceof Float) { - Float l = (Float) o; - return l.toString().getBytes(); - } else if (o instanceof Double) { - Double l = (Double) o; - return l.toString().getBytes(); - } - - // TODO: handle DataBag, Map<Object, Object>, and Tuple - - return ((DataByteArray) o).get(); - } - - public void cleanupOnFailure(String failure, Job job) { - } - - @Override - public WritableComparable<?> getSplitComparable(InputSplit inputSplit) throws IOException { - //cannot get access to the range directly - AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - rangeInputSplit.write(out); - out.close(); - DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); - Range range = new Range(); - range.readFields(stream); - stream.close(); - return range; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java deleted file mode 100644 index 392c108..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java +++ /dev/null @@ -1,348 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.regex.Pattern; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.ProjectionElem; -import org.openrdf.query.algebra.ProjectionElemList; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class IndexWritingTool extends Configured implements Tool { - - private static final String sparql_key = "SPARQL.VALUE"; - private static String cardCounter = "count"; - - - public static void main(String[] args) throws Exception { - - ToolRunner.run(new Configuration(), new IndexWritingTool(), args); - - } - - @Override - public int run(final String[] args) throws Exception { - Preconditions.checkArgument(args.length == 7, "java " + IndexWritingTool.class.getCanonicalName() - + " hdfsSaveLocation sparqlFile cbinstance cbzk cbuser cbpassword rdfTablePrefix."); - - final String inputDir = args[0]; - final String sparqlFile = args[1]; - final String instStr = args[2]; - final String zooStr = args[3]; - final String userStr = args[4]; - final String passStr = args[5]; - final String tablePrefix = args[6]; - - String sparql = FileUtils.readFileToString(new File(sparqlFile)); - - Job job = new Job(getConf(), "Write HDFS Index to Accumulo"); - job.setJarByClass(this.getClass()); - - Configuration jobConf = job.getConfiguration(); - jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); - setVarOrders(sparql, jobConf); - - TextInputFormat.setInputPaths(job, inputDir); - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(MyMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Mutation.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - - job.setNumReduceTasks(0); - - String tableName; - if (zooStr.equals("mock")) { - tableName = tablePrefix; - } else { - tableName = tablePrefix + "INDEX_" + UUID.randomUUID().toString().replace("-", "").toUpperCase(); - } - setAccumuloOutput(instStr, zooStr, userStr, passStr, job, tableName); - - jobConf.set(sparql_key, sparql); - - int complete = job.waitForCompletion(true) ? 0 : -1; - - if (complete == 0) { - - String[] varOrders = jobConf.getStrings("varOrders"); - String orders = Joiner.on("\u0000").join(varOrders); - Instance inst; - - if (zooStr.equals("mock")) { - inst = new MockInstance(instStr); - } else { - inst = new ZooKeeperInstance(instStr, zooStr); - } - - Connector conn = inst.getConnector(userStr, passStr.getBytes()); - BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1); - - Counters counters = job.getCounters(); - Counter c1 = counters.findCounter(cardCounter, cardCounter); - - Mutation m = new Mutation("~SPARQL"); - Value v = new Value(sparql.getBytes()); - m.put(new Text("" + c1.getValue()), new Text(orders), v); - bw.addMutation(m); - - bw.close(); - - return complete; - } else { - return complete; - } - - - } - - - public void setVarOrders(String s, Configuration conf) throws MalformedQueryException { - - SPARQLParser parser = new SPARQLParser(); - TupleExpr query = parser.parseQuery(s, null).getTupleExpr(); - - List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames()); - String projElems = Joiner.on(";").join(projList); - conf.set("projElems", projElems); - - Pattern splitPattern1 = Pattern.compile("\n"); - Pattern splitPattern2 = Pattern.compile(","); - String[] lines = splitPattern1.split(s); - - List<String> varOrders = Lists.newArrayList(); - List<String> varOrderPos = Lists.newArrayList(); - - int orderNum = 0; - int projSizeSq = projList.size()*projList.size(); - - for (String t : lines) { - - - if(orderNum > projSizeSq){ - break; - } - - String[] order = null; - if (t.startsWith("#prefix")) { - t = t.substring(7).trim(); - order = splitPattern2.split(t, projList.size()); - } - - - String tempVarOrder = ""; - String tempVarOrderPos = ""; - - if (order != null) { - for (String u : order) { - if (tempVarOrder.length() == 0) { - tempVarOrder = u.trim(); - } else { - tempVarOrder = tempVarOrder + ";" + u.trim(); - } - int pos = projList.indexOf(u.trim()); - if (pos < 0) { - throw new IllegalArgumentException("Invalid variable order!"); - } else { - if (tempVarOrderPos.length() == 0) { - tempVarOrderPos = tempVarOrderPos + pos; - } else { - tempVarOrderPos = tempVarOrderPos + ";" + pos; - } - } - } - - varOrders.add(tempVarOrder); - varOrderPos.add(tempVarOrderPos); - } - - if(tempVarOrder.length() > 0) { - orderNum++; - } - - } - - if(orderNum == 0) { - varOrders.add(projElems); - String tempVarPos = ""; - - for(int i = 0; i < projList.size(); i++) { - if(i == 0) { - tempVarPos = Integer.toString(0); - } else { - tempVarPos = tempVarPos + ";" + i; - } - } - varOrderPos.add(tempVarPos); - - } - - String[] vOrders = varOrders.toArray(new String[varOrders.size()]); - String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]); - - - - conf.setStrings("varOrders", vOrders); - conf.setStrings("varOrderPos", vOrderPos); - - } - - - private static void setAccumuloOutput(String instStr, String zooStr, String userStr, String passStr, Job job, String tableName) - throws AccumuloSecurityException { - - AuthenticationToken token = new PasswordToken(passStr); - AccumuloOutputFormat.setConnectorInfo(job, userStr, token); - AccumuloOutputFormat.setDefaultTableName(job, tableName); - AccumuloOutputFormat.setCreateTables(job, true); - //TODO best way to do this? - - if (zooStr.equals("mock")) { - AccumuloOutputFormat.setMockInstance(job, instStr); - } else { - AccumuloOutputFormat.setZooKeeperInstance(job, instStr, zooStr); - } - - job.setOutputFormatClass(AccumuloOutputFormat.class); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Mutation.class); - } - - public static class MyMapper extends Mapper<LongWritable, Text, Text, Mutation> { - - private static final Logger logger = Logger.getLogger(MyMapper.class); - final static Text EMPTY_TEXT = new Text(); - final static Value EMPTY_VALUE = new Value(new byte[] {}); - private String[] varOrderPos = null; - private String[] projElem = null; - private Pattern splitPattern = null; - private List<List<Integer>> varPositions = Lists.newArrayList(); - - - - @Override - protected void setup(Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException, - InterruptedException { - - Configuration conf = context.getConfiguration(); - - varOrderPos = conf.getStrings("varOrderPos"); - splitPattern = Pattern.compile("\t"); - - for (String s : varOrderPos) { - String[] pos = s.split(";"); - List<Integer> intPos = Lists.newArrayList(); - int i = 0; - for(String t: pos) { - i = Integer.parseInt(t); - intPos.add(i); - } - - varPositions.add(intPos); - - } - - projElem = conf.get("projElems").split(";"); - - super.setup(context); - } - - - - - - - @Override - public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { - - String[] result = splitPattern.split(value.toString()); - - - for (List<Integer> list : varPositions) { - - String values = ""; - String vars = ""; - - for (Integer i : list) { - - if (values.length() == 0) { - values = result[i]; - vars = projElem[i]; - } else { - values = values + "\u0000" + result[i]; - vars = vars + "\u0000" + projElem[i]; - } - - } - Mutation m = new Mutation(new Text(values)); - m.put(new Text(vars), EMPTY_TEXT, EMPTY_VALUE); - output.write(EMPTY_TEXT, m); - - } - output.getCounter(cardCounter, cardCounter).increment(1); - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java deleted file mode 100644 index ed8134d..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java +++ /dev/null @@ -1,268 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import com.google.common.base.Preconditions; -import com.google.common.io.ByteStreams; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRdfEvalStatsDAO; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer; -import mvm.rya.rdftriplestore.evaluation.QueryJoinOptimizer; -import mvm.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics; -import mvm.rya.rdftriplestore.inference.InferenceEngine; -import mvm.rya.rdftriplestore.inference.InverseOfVisitor; -import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor; -import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.openrdf.query.algebra.QueryRoot; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; -import java.io.IOException; - -/** - * Created by IntelliJ IDEA. - * Date: 4/23/12 - * Time: 9:31 AM - * To change this template use File | Settings | File Templates. - */ -public class SparqlQueryPigEngine { - private static final Log logger = LogFactory.getLog(SparqlQueryPigEngine.class); - - private String hadoopDir; - private ExecType execType = ExecType.MAPREDUCE; //default to mapreduce - private boolean inference = true; - private boolean stats = true; - private SparqlToPigTransformVisitor sparqlToPigTransformVisitor; - private PigServer pigServer; - private InferenceEngine inferenceEngine = null; - private RdfCloudTripleStoreEvaluationStatistics rdfCloudTripleStoreEvaluationStatistics; - private AccumuloRyaDAO ryaDAO; - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - - private AccumuloRdfEvalStatsDAO rdfEvalStatsDAO; - - public AccumuloRdfConfiguration getConf() { - return conf; - } - - public void setConf(AccumuloRdfConfiguration conf) { - this.conf = conf; - } - - public void init() throws Exception { - Preconditions.checkNotNull(sparqlToPigTransformVisitor, "Sparql To Pig Transform Visitor must not be null"); - logger.info("Initializing Sparql Query Pig Engine"); - if (hadoopDir != null) { - //set hadoop dir property - System.setProperty("HADOOPDIR", hadoopDir); - } - //TODO: Maybe have validation of the HadoopDir system property - - if (pigServer == null) { - pigServer = new PigServer(execType); - } - - if (inference || stats) { - String instance = sparqlToPigTransformVisitor.getInstance(); - String zoo = sparqlToPigTransformVisitor.getZk(); - String user = sparqlToPigTransformVisitor.getUser(); - String pass = sparqlToPigTransformVisitor.getPassword(); - - Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes()); - - String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix(); - conf.setTablePrefix(tablePrefix); - if (inference) { - logger.info("Using inference"); - inferenceEngine = new InferenceEngine(); - ryaDAO = new AccumuloRyaDAO(); - ryaDAO.setConf(conf); - ryaDAO.setConnector(connector); - ryaDAO.init(); - - inferenceEngine.setRyaDAO(ryaDAO); - inferenceEngine.setConf(conf); - inferenceEngine.setSchedule(false); - inferenceEngine.init(); - } - if (stats) { - logger.info("Using stats"); - rdfEvalStatsDAO = new AccumuloRdfEvalStatsDAO(); - rdfEvalStatsDAO.setConf(conf); - rdfEvalStatsDAO.setConnector(connector); -// rdfEvalStatsDAO.setEvalTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - rdfEvalStatsDAO.init(); - rdfCloudTripleStoreEvaluationStatistics = new RdfCloudTripleStoreEvaluationStatistics(conf, rdfEvalStatsDAO); - } - } - } - - public void destroy() throws Exception { - logger.info("Shutting down Sparql Query Pig Engine"); - pigServer.shutdown(); - if (ryaDAO != null) { - ryaDAO.destroy(); - } - if (inferenceEngine != null) { - inferenceEngine.destroy(); - } - if (rdfEvalStatsDAO != null) { - rdfEvalStatsDAO.destroy(); - } - } - - /** - * Transform a sparql query into a pig script and execute it. Save results in hdfsSaveLocation - * - * @param sparql to execute - * @param hdfsSaveLocation to save the execution - * @throws java.io.IOException - */ - public void runQuery(String sparql, String hdfsSaveLocation) throws IOException { - Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); - Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null"); - logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]"); - pigServer.deleteFile(hdfsSaveLocation); - try { - String pigScript = generatePigScript(sparql); - if (logger.isDebugEnabled()) { - logger.debug("Pig script [" + pigScript + "]"); - } - pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes())); - pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant - } catch (Exception e) { - throw new IOException(e); - } - } - - public String generatePigScript(String sparql) throws Exception { - Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); - QueryParser parser = new SPARQLParser(); - ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr()); - -// SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer(); -// similarVarJoinOptimizer.optimize(tupleExpr, null, null); - - if (inference || stats) { - if (inference) { - tupleExpr.visit(new TransitivePropertyVisitor(conf, inferenceEngine)); - tupleExpr.visit(new SymmetricPropertyVisitor(conf, inferenceEngine)); - tupleExpr.visit(new InverseOfVisitor(conf, inferenceEngine)); - } - if (stats) { - (new QueryJoinOptimizer(rdfCloudTripleStoreEvaluationStatistics)).optimize(tupleExpr, null, null); - } - } - - sparqlToPigTransformVisitor.meet(tupleExpr); - return sparqlToPigTransformVisitor.getPigScript(); - } - - - public static void main(String[] args) { - try { - Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " + - "Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf mvm.rya.accumulo.pig.SparqlQueryPigEngine " + - "tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_"); - String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0]))); - String hdfsSaveLocation = args[1]; - SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor(); - visitor.setTablePrefix(args[6]); - visitor.setInstance(args[2]); - visitor.setZk(args[3]); - visitor.setUser(args[4]); - visitor.setPassword(args[5]); - - SparqlQueryPigEngine engine = new SparqlQueryPigEngine(); - engine.setSparqlToPigTransformVisitor(visitor); - engine.setInference(false); - engine.setStats(false); - - engine.init(); - - engine.runQuery(sparql, hdfsSaveLocation); - - engine.destroy(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public String getHadoopDir() { - return hadoopDir; - } - - public void setHadoopDir(String hadoopDir) { - this.hadoopDir = hadoopDir; - } - - public PigServer getPigServer() { - return pigServer; - } - - public void setPigServer(PigServer pigServer) { - this.pigServer = pigServer; - } - - public ExecType getExecType() { - return execType; - } - - public void setExecType(ExecType execType) { - this.execType = execType; - } - - public boolean isInference() { - return inference; - } - - public void setInference(boolean inference) { - this.inference = inference; - } - - public boolean isStats() { - return stats; - } - - public void setStats(boolean stats) { - this.stats = stats; - } - - public SparqlToPigTransformVisitor getSparqlToPigTransformVisitor() { - return sparqlToPigTransformVisitor; - } - - public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) { - this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java deleted file mode 100644 index 38d8adb..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java +++ /dev/null @@ -1,345 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.openrdf.model.Literal; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.query.algebra.*; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; - -import java.util.*; - -/** - * Created by IntelliJ IDEA. - * Date: 4/12/12 - * Time: 10:17 AM - * To change this template use File | Settings | File Templates. - */ -public class SparqlToPigTransformVisitor extends QueryModelVisitorBase<RuntimeException> { - private StringBuilder pigScriptBuilder = new StringBuilder(); - private String tablePrefix; - private String instance, zk, user, password; //TODO: use a Configuration object to get these - - private Map<String, String> varToSet = new HashMap<String, String>(); - private Map<TupleExpr, List<String>> exprToNames = new HashMap<TupleExpr, List<String>>(); - private Map<TupleExpr, String> exprToVar = new HashMap<TupleExpr, String>(); - - private char i = 'A'; //TODO: do better, hack - - public SparqlToPigTransformVisitor() { - pigScriptBuilder.append("set pig.splitCombination false;\n") - .append("set default_parallel 32;\n") //TODO: set parallel properly - .append("set mapred.map.tasks.speculative.execution false;\n") - .append("set mapred.reduce.tasks.speculative.execution false;\n") - .append("set io.sort.mb 256;\n") - .append("set mapred.child.java.opts -Xmx2048m;\n") - .append("set mapred.compress.map.output true;\n") - .append("set mapred.map.output.compression.codec org.apache.hadoop.io.compress.GzipCodec;\n") - .append("set io.file.buffer.size 65536;\n") - .append("set io.sort.factor 25;\n"); - } - - @Override - public void meet(StatementPattern node) throws RuntimeException { - super.meet(node); - String subjValue = getVarValue(node.getSubjectVar()); - String predValue = getVarValue(node.getPredicateVar()); - String objValue = getVarValue(node.getObjectVar()); - - String subj = i + "_s"; - String pred = i + "_p"; - String obj = i + "_o"; - String var = i + ""; - if (node.getSubjectVar().getValue() == null) { //TODO: look nicer - subj = node.getSubjectVar().getName(); - varToSet.put(subj, var); - - addToExprToNames(node, subj); - } - if (node.getPredicateVar().getValue() == null) { //TODO: look nicer - pred = node.getPredicateVar().getName(); - varToSet.put(pred, var); - - addToExprToNames(node, pred); - } - if (node.getObjectVar().getValue() == null) { //TODO: look nicer - obj = node.getObjectVar().getName(); - varToSet.put(obj, var); - - addToExprToNames(node, obj); - } - if (node.getContextVar() != null && node.getContextVar().getValue() == null) { - String cntxtName = node.getContextVar().getName(); - varToSet.put(cntxtName, var); - - addToExprToNames(node, cntxtName); - } - //load 'l_' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('<http://www.Department0.University0.edu>', '', '', - // 'stratus', 'stratus13:2181', 'root', 'password') AS (dept:chararray, p:chararray, univ:chararray); -// pigScriptBuilder.append(i).append(" = load '").append(tablePrefix).append("' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('") -// .append(subjValue).append("','").append(predValue).append("','").append(objValue).append("','").append(instance).append("','") -// .append(zk).append("','").append(user).append("','").append(password).append("') AS (").append(subj).append(":chararray, ") -// .append(pred).append(":chararray, ").append(obj).append(":chararray);\n"); - - //load 'cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c' - //using mvm.rya.accumulo.pig.StatementPatternStorage() AS (dept:chararray, p:chararray, univ:chararray); - pigScriptBuilder.append(i).append(" = load 'accumulo://").append(tablePrefix).append("?instance=").append(instance).append("&user=").append(user) - .append("&password=").append(password).append("&zookeepers=").append(zk); - if (subjValue != null && subjValue.length() > 0) { - pigScriptBuilder.append("&subject=").append(subjValue); - } - if (predValue != null && predValue.length() > 0) { - pigScriptBuilder.append("&predicate=").append(predValue); - } - if (objValue != null && objValue.length() > 0) { - pigScriptBuilder.append("&object=").append(objValue); - } - if (node.getContextVar() != null && node.getContextVar().getValue() != null) { - pigScriptBuilder.append("&context=").append(getVarValue(node.getContextVar())); - } - - pigScriptBuilder.append("' using ").append(StatementPatternStorage.class.getName()).append("() AS (").append(subj).append(":chararray, ") - .append(pred).append(":chararray, ").append(obj).append(":chararray"); - if (node.getContextVar() != null) { - Value cntxtValue = node.getContextVar().getValue(); - String cntxtName = null; - if (cntxtValue == null) { - //use name - cntxtName = node.getContextVar().getName(); - } else { - cntxtName = i + "_c"; - } - pigScriptBuilder.append(", ").append(cntxtName).append(":chararray"); - } - pigScriptBuilder.append(");\n"); - //TODO: add auths - - exprToVar.put(node, var); - i++; - } - - private void addToExprToNames(TupleExpr node, String name) { - List<String> names = exprToNames.get(node); - if (names == null) { - names = new ArrayList<String>(); - exprToNames.put(node, names); - } - names.add(name); - } - - @Override - public void meet(Union node) throws RuntimeException { - super.meet(node); - - TupleExpr leftArg = node.getLeftArg(); - TupleExpr rightArg = node.getRightArg(); - String left_var = exprToVar.get(leftArg); - String right_var = exprToVar.get(rightArg); - //Q = UNION ONSCHEMA B, P; - pigScriptBuilder.append(i).append(" = UNION ONSCHEMA ").append(left_var).append(", ").append(right_var).append(";\n"); - - String unionVar = i + ""; - List<String> left_names = exprToNames.get(leftArg); - List<String> right_names = exprToNames.get(rightArg); - for (String name : left_names) { - varToSet.put(name, unionVar); - addToExprToNames(node, name); - } - for (String name : right_names) { - varToSet.put(name, unionVar); - addToExprToNames(node, name); - } - exprToVar.put(node, unionVar); - i++; - } - - @Override - public void meet(Join node) throws RuntimeException { - super.meet(node); - - TupleExpr leftArg = node.getLeftArg(); - TupleExpr rightArg = node.getRightArg(); - List<String> left_names = exprToNames.get(leftArg); - List<String> right_names = exprToNames.get(rightArg); - - Set<String> joinNames = new HashSet<String>(left_names); - joinNames.retainAll(right_names); //intersection, this is what I join on - //SEC = join FIR by (MEMB_OF::ugrad, SUBORG_J::univ), UGRADDEG by (ugrad, univ); - StringBuilder joinStr = new StringBuilder(); - joinStr.append("("); - boolean first = true; - for (String name : joinNames) { //TODO: Make this a utility method - if (!first) { - joinStr.append(","); - } - first = false; - joinStr.append(name); - } - joinStr.append(")"); - - String left_var = exprToVar.get(leftArg); - String right_var = exprToVar.get(rightArg); - if (joinStr.length() <= 2) { - //no join params, need to cross - pigScriptBuilder.append(i).append(" = cross ").append(left_var).append(", ").append(right_var).append(";\n"); - } else { - //join - pigScriptBuilder.append(i).append(" = join ").append(left_var); - pigScriptBuilder.append(" by ").append(joinStr); - pigScriptBuilder.append(", ").append(right_var); - pigScriptBuilder.append(" by ").append(joinStr); - pigScriptBuilder.append(";\n"); - - } - - String joinVarStr = i + ""; - i++; - // D = foreach C GENERATE A::subj AS subj:chararray, A::A_p AS p:chararray; - String forEachVarStr = i + ""; - pigScriptBuilder.append(i).append(" = foreach ").append(joinVarStr).append(" GENERATE "); - Map<String, String> nameToJoinName = new HashMap<String, String>(); - for (String name : left_names) { - varToSet.put(name, forEachVarStr); - addToExprToNames(node, name); - nameToJoinName.put(name, left_var + "::" + name); - } - for (String name : right_names) { - varToSet.put(name, forEachVarStr); - addToExprToNames(node, name); - nameToJoinName.put(name, right_var + "::" + name); - } - - first = true; - for (Map.Entry entry : nameToJoinName.entrySet()) { - if (!first) { - pigScriptBuilder.append(","); - } - first = false; - pigScriptBuilder.append(entry.getValue()).append(" AS ").append(entry.getKey()).append(":chararray "); - } - pigScriptBuilder.append(";\n"); - - exprToVar.put(node, forEachVarStr); - i++; - } - - @Override - public void meet(Projection node) throws RuntimeException { - super.meet(node); - ProjectionElemList list = node.getProjectionElemList(); - String set = null; - StringBuilder projList = new StringBuilder(); - boolean first = true; - //TODO: we do not support projections from multiple pig statements yet - for (String name : list.getTargetNames()) { - set = varToSet.get(name); //TODO: overwrite - if (set == null) { - throw new IllegalArgumentException("Have not found any pig logic for name[" + name + "]"); - } - if (!first) { - projList.append(","); - } - first = false; - projList.append(name); - } - if (set == null) - throw new IllegalArgumentException(""); //TODO: Fill this - //SUBORG = FOREACH SUBORG_L GENERATE dept, univ; - pigScriptBuilder.append("PROJ = FOREACH ").append(set).append(" GENERATE ").append(projList.toString()).append(";\n"); - } - - @Override - public void meet(Slice node) throws RuntimeException { - super.meet(node); - long limit = node.getLimit(); - //PROJ = LIMIT PROJ 10; - pigScriptBuilder.append("PROJ = LIMIT PROJ ").append(limit).append(";\n"); - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getZk() { - return zk; - } - - public void setZk(String zk) { - this.zk = zk; - } - - public String getInstance() { - return instance; - } - - public void setInstance(String instance) { - this.instance = instance; - } - - public String getTablePrefix() { - return tablePrefix; - } - - public void setTablePrefix(String tablePrefix) { - this.tablePrefix = tablePrefix; - } - - public String getPigScript() { - return pigScriptBuilder.toString(); - } - - protected String getVarValue(Var var) { - if (var == null) { - return ""; - } else { - Value value = var.getValue(); - if (value == null) { - return ""; - } - if (value instanceof URI) { - return "<" + value.stringValue() + ">"; - } - if (value instanceof Literal) { - Literal lit = (Literal) value; - if (lit.getDatatype() == null) { - //string - return "\\'" + value.stringValue() + "\\'"; - } - } - return value.stringValue(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java deleted file mode 100644 index 9ec9d45..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java +++ /dev/null @@ -1,304 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.query.strategy.ByteRange; -import mvm.rya.api.query.strategy.TriplePatternStrategy; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.rdftriplestore.inference.InferenceEngine; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; - -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.openrdf.model.Resource; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.vocabulary.RDF; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteStreams; - -/** - */ -public class StatementPatternStorage extends AccumuloStorage { - private static final Log logger = LogFactory.getLog(StatementPatternStorage.class); - protected TABLE_LAYOUT layout; - protected String subject = "?s"; - protected String predicate = "?p"; - protected String object = "?o"; - protected String context; - private Value subject_value; - private Value predicate_value; - private Value object_value; - - private RyaTripleContext ryaContext; - - /** - * whether to turn inferencing on or off - */ - private boolean infer = false; - - public StatementPatternStorage() { - if (super.conf != null){ - ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(super.conf)); - } - else { - ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); - } - - } - - private Value getValue(Var subjectVar) { - return subjectVar.hasValue() ? subjectVar.getValue() : null; - } - - @Override - public void setLocation(String location, Job job) throws IOException { - super.setLocation(location, job); - } - - @Override - protected void setLocationFromUri(String uri, Job job) throws IOException { - super.setLocationFromUri(uri, job); - // ex: accumulo://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c&context=c&infer=true - addStatementPatternRange(subject, predicate, object, context); - if (infer) { - addInferredRanges(table, job); - } - - if (layout == null || ranges.size() == 0) - throw new IllegalArgumentException("Range and/or layout is null. Check the query"); - table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table); - tableName = new Text(table); - } - - @Override - protected void addLocationFromUriPart(String[] pair) { - if (pair[0].equals("subject")) { - this.subject = pair[1]; - } else if (pair[0].equals("predicate")) { - this.predicate = pair[1]; - } else if (pair[0].equals("object")) { - this.object = pair[1]; - } else if (pair[0].equals("context")) { - this.context = pair[1]; - } else if (pair[0].equals("infer")) { - this.infer = Boolean.parseBoolean(pair[1]); - } - } - - protected void addStatementPatternRange(String subj, String pred, String obj, String ctxt) throws IOException { - logger.info("Adding statement pattern[subject:" + subj + ", predicate:" + pred + ", object:" + obj + ", context:" + ctxt + "]"); - StringBuilder sparqlBuilder = new StringBuilder(); - sparqlBuilder.append("select * where {\n"); - if (ctxt != null) { - /** - * select * where { - GRAPH ?g { - <http://www.example.org/exampleDocument#Monica> ?p ?o. - } - } - */ - sparqlBuilder.append("GRAPH ").append(ctxt).append(" {\n"); - } - sparqlBuilder.append(subj).append(" ").append(pred).append(" ").append(obj).append(".\n"); - if (ctxt != null) { - sparqlBuilder.append("}\n"); - } - sparqlBuilder.append("}\n"); - String sparql = sparqlBuilder.toString(); - - if (logger.isDebugEnabled()) { - logger.debug("Sparql statement range[" + sparql + "]"); - } - - QueryParser parser = new SPARQLParser(); - ParsedQuery parsedQuery = null; - try { - parsedQuery = parser.parseQuery(sparql, null); - } catch (MalformedQueryException e) { - throw new IOException(e); - } - parsedQuery.getTupleExpr().visitChildren(new QueryModelVisitorBase<IOException>() { - @Override - public void meet(StatementPattern node) throws IOException { - Var subjectVar = node.getSubjectVar(); - Var predicateVar = node.getPredicateVar(); - Var objectVar = node.getObjectVar(); - subject_value = getValue(subjectVar); - predicate_value = getValue(predicateVar); - object_value = getValue(objectVar); - Var contextVar = node.getContextVar(); - Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, object_value); - layout = temp.getKey(); - Range range = temp.getValue(); - addRange(range); - if (contextVar != null && contextVar.getValue() != null) { - String context_str = contextVar.getValue().stringValue(); - addColumnPair(context_str, ""); - } - } - }); - } - - protected Map.Entry<TABLE_LAYOUT, Range> createRange(Value s_v, Value p_v, Value o_v) throws IOException { - RyaURI subject_rya = RdfToRyaConversions.convertResource((Resource) s_v); - RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v); - RyaType object_rya = RdfToRyaConversions.convertValue(o_v); - TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null); - if (strategy == null) - return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range()); - Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null); - ByteRange byteRange = entry.getValue(); - return new RdfCloudTripleStoreUtils.CustomEntry<mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>( - entry.getKey(), new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())) - ); - } - - protected void addInferredRanges(String tablePrefix, Job job) throws IOException { - logger.info("Adding inferences to statement pattern[subject:" + subject_value + ", predicate:" + predicate_value + ", object:" + object_value + "]"); - //inference engine - AccumuloRyaDAO ryaDAO = new AccumuloRyaDAO(); - InferenceEngine inferenceEngine = new InferenceEngine(); - try { - AccumuloRdfConfiguration rdfConf = new AccumuloRdfConfiguration(job.getConfiguration()); - rdfConf.setTablePrefix(tablePrefix); - ryaDAO.setConf(rdfConf); - try { - if (!mock) { - ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes())); - } else { - ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes())); - } - } catch (Exception e) { - throw new IOException(e); - } - ryaDAO.init(); - inferenceEngine.setConf(rdfConf); - inferenceEngine.setRyaDAO(ryaDAO); - inferenceEngine.setSchedule(false); - inferenceEngine.init(); - //is it subclassof or subpropertyof - if (RDF.TYPE.equals(predicate_value)) { - //try subclassof - Collection<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubClassOfGraph(), (URI) object_value); - if (parents != null && parents.size() > 0) { - //subclassof relationships found - //don't add self, that will happen anyway later - //add all relationships - for (URI parent : parents) { - Map.Entry<TABLE_LAYOUT, Range> temp = - createRange(subject_value, predicate_value, parent); - Range range = temp.getValue(); - if (logger.isDebugEnabled()) { - logger.debug("Found subClassOf relationship [type:" + object_value + " is subClassOf:" + parent + "]"); - } - addRange(range); - } - } - } else if (predicate_value != null) { - //subpropertyof check - Set<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubPropertyOfGraph(), (URI) predicate_value); - for (URI parent : parents) { - Map.Entry<TABLE_LAYOUT, Range> temp = - createRange(subject_value, parent, object_value); - Range range = temp.getValue(); - if (logger.isDebugEnabled()) { - logger.debug("Found subPropertyOf relationship [type:" + predicate_value + " is subPropertyOf:" + parent + "]"); - } - addRange(range); - } - } - } catch (Exception e) { - logger.error("Exception in adding inferred ranges", e); - throw new IOException(e); - } finally { - if (inferenceEngine != null) { - try { - inferenceEngine.destroy(); - } catch (InferenceEngineException e) { - logger.error("Exception closing InferenceEngine", e); - } - } - if (ryaDAO != null) { - try { - ryaDAO.destroy(); - } catch (RyaDAOException e) { - logger.error("Exception closing ryadao", e); - } - } - } - } - - @Override - public Tuple getNext() throws IOException { - try { - if (reader.nextKeyValue()) { - Key key = (Key) reader.getCurrentKey(); - org.apache.accumulo.core.data.Value value = (org.apache.accumulo.core.data.Value) reader.getCurrentValue(); - ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); - RyaStatement ryaStatement = ryaContext.deserializeTriple(layout, new TripleRow(key.getRow().getBytes(), - key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); - - Tuple tuple = TupleFactory.getInstance().newTuple(4); - tuple.set(0, ryaStatement.getSubject().getData()); - tuple.set(1, ryaStatement.getPredicate().getData()); - tuple.set(2, ryaStatement.getObject().getData()); - tuple.set(3, (ryaStatement.getContext() != null) ? (ryaStatement.getContext().getData()) : (null)); - return tuple; - } - } catch (Exception e) { - throw new IOException(e); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java deleted file mode 100644 index 4b458b6..0000000 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java +++ /dev/null @@ -1,210 +0,0 @@ -package mvm.rya.accumulo.pig.optimizer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.openrdf.query.BindingSet; -import org.openrdf.query.Dataset; -import org.openrdf.query.algebra.*; -import org.openrdf.query.algebra.evaluation.QueryOptimizer; -import org.openrdf.query.algebra.evaluation.impl.EvaluationStatistics; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.algebra.helpers.StatementPatternCollector; - -import java.util.*; - -/** - * A query optimizer that re-orders nested Joins according to cardinality, preferring joins that have similar variables. - * - */ -public class SimilarVarJoinOptimizer implements QueryOptimizer { - - protected final EvaluationStatistics statistics; - - public SimilarVarJoinOptimizer() { - this(new EvaluationStatistics()); - } - - public SimilarVarJoinOptimizer(EvaluationStatistics statistics) { - this.statistics = statistics; - } - - /** - * Applies generally applicable optimizations: path expressions are sorted - * from more to less specific. - * - * @param tupleExpr - */ - public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { - tupleExpr.visit(new JoinVisitor()); - } - - protected class JoinVisitor extends QueryModelVisitorBase<RuntimeException> { - - Set<String> boundVars = new HashSet<String>(); - - @Override - public void meet(LeftJoin leftJoin) { - leftJoin.getLeftArg().visit(this); - - Set<String> origBoundVars = boundVars; - try { - boundVars = new HashSet<String>(boundVars); - boundVars.addAll(leftJoin.getLeftArg().getBindingNames()); - - leftJoin.getRightArg().visit(this); - } finally { - boundVars = origBoundVars; - } - } - - @Override - public void meet(Join node) { - Set<String> origBoundVars = boundVars; - try { - boundVars = new HashSet<String>(boundVars); - - // Recursively get the join arguments - List<TupleExpr> joinArgs = getJoinArgs(node, new ArrayList<TupleExpr>()); - - // Build maps of cardinalities and vars per tuple expression - Map<TupleExpr, Double> cardinalityMap = new HashMap<TupleExpr, Double>(); - - for (TupleExpr tupleExpr : joinArgs) { - double cardinality = statistics.getCardinality(tupleExpr); - cardinalityMap.put(tupleExpr, cardinality); - } - - // Reorder the (recursive) join arguments to a more optimal sequence - List<TupleExpr> orderedJoinArgs = new ArrayList<TupleExpr>(joinArgs.size()); - TupleExpr last = null; - while (!joinArgs.isEmpty()) { - TupleExpr tupleExpr = selectNextTupleExpr(joinArgs, cardinalityMap, last); - if (tupleExpr == null) { - break; - } - - joinArgs.remove(tupleExpr); - orderedJoinArgs.add(tupleExpr); - last = tupleExpr; - - // Recursively optimize join arguments - tupleExpr.visit(this); - - boundVars.addAll(tupleExpr.getBindingNames()); - } - - // Build new join hierarchy - // Note: generated hierarchy is right-recursive to help the - // IterativeEvaluationOptimizer to factor out the left-most join - // argument - int i = 0; - TupleExpr replacement = orderedJoinArgs.get(i); - for (i++; i < orderedJoinArgs.size(); i++) { - replacement = new Join(replacement, orderedJoinArgs.get(i)); - } - - // Replace old join hierarchy - node.replaceWith(replacement); - } finally { - boundVars = origBoundVars; - } - } - - protected <L extends List<TupleExpr>> L getJoinArgs(TupleExpr tupleExpr, L joinArgs) { - if (tupleExpr instanceof Join) { - Join join = (Join) tupleExpr; - getJoinArgs(join.getLeftArg(), joinArgs); - getJoinArgs(join.getRightArg(), joinArgs); - } else { - joinArgs.add(tupleExpr); - } - - return joinArgs; - } - - protected List<Var> getStatementPatternVars(TupleExpr tupleExpr) { - if(tupleExpr == null) - return null; - List<StatementPattern> stPatterns = StatementPatternCollector.process(tupleExpr); - List<Var> varList = new ArrayList<Var>(stPatterns.size() * 4); - for (StatementPattern sp : stPatterns) { - sp.getVars(varList); - } - return varList; - } - - protected <M extends Map<Var, Integer>> M getVarFreqMap(List<Var> varList, M varFreqMap) { - for (Var var : varList) { - Integer freq = varFreqMap.get(var); - freq = (freq == null) ? 1 : freq + 1; - varFreqMap.put(var, freq); - } - return varFreqMap; - } - - /** - * Selects from a list of tuple expressions the next tuple expression that - * should be evaluated. This method selects the tuple expression with - * highest number of bound variables, preferring variables that have been - * bound in other tuple expressions over variables with a fixed value. - */ - protected TupleExpr selectNextTupleExpr(List<TupleExpr> expressions, - Map<TupleExpr, Double> cardinalityMap, - TupleExpr last) { - double lowestCardinality = Double.MAX_VALUE; - TupleExpr result = expressions.get(0); - expressions = getExprsWithSameVars(expressions, last); - - for (TupleExpr tupleExpr : expressions) { - // Calculate a score for this tuple expression - double cardinality = cardinalityMap.get(tupleExpr); - - if (cardinality < lowestCardinality) { - // More specific path expression found - lowestCardinality = cardinality; - result = tupleExpr; - } - } - - return result; - } - - protected List<TupleExpr> getExprsWithSameVars(List<TupleExpr> expressions, TupleExpr last) { - if(last == null) - return expressions; - List<TupleExpr> retExprs = new ArrayList<TupleExpr>(); - for(TupleExpr tupleExpr : expressions) { - List<Var> statementPatternVars = getStatementPatternVars(tupleExpr); - List<Var> lastVars = getStatementPatternVars(last); - statementPatternVars.retainAll(lastVars); - if(statementPatternVars.size() > 0) { - retExprs.add(tupleExpr); - } - } - if(retExprs.size() == 0) { - return expressions; - } - return retExprs; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java deleted file mode 100644 index 119ccb1..0000000 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java +++ /dev/null @@ -1,284 +0,0 @@ -package mvm.rya.accumulo.pig; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import junit.framework.TestCase; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.admin.SecurityOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.pig.data.Tuple; - -/** - * Created by IntelliJ IDEA. - * Date: 4/20/12 - * Time: 10:17 AM - * To change this template use File | Settings | File Templates. - */ -public class AccumuloStorageTest extends TestCase { - - private String user = "user"; - private String pwd = "pwd"; - private String instance = "myinstance"; - private String table = "testTable"; - private Authorizations auths = Constants.NO_AUTHS; - private Connector connector; - - @Override - public void setUp() throws Exception { - super.setUp(); - connector = new MockInstance(instance).getConnector(user, new PasswordToken(pwd.getBytes())); - connector.tableOperations().create(table); - SecurityOperations secOps = connector.securityOperations(); - secOps.createLocalUser(user, new PasswordToken(pwd.getBytes())); - secOps.grantTablePermission(user, table, TablePermission.READ); - secOps.grantTablePermission(user, table, TablePermission.WRITE); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - connector.tableOperations().delete(table); - } - - public void testSimpleOutput() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("row"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|z&mock=true"; - AccumuloStorage storage = createAccumuloStorage(location); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(1, count); - } - - public void testRange() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("a"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - row = new Mutation("b"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - row = new Mutation("d"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true"; - AccumuloStorage storage = createAccumuloStorage(location); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(2, count); - } - - public void testMultipleRanges() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("a"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - row = new Mutation("b"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - row = new Mutation("d"); - row.put("cf", "cq", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&range=d|e&mock=true"; - List<AccumuloStorage> storages = createAccumuloStorages(location); - assertEquals(2, storages.size()); - AccumuloStorage storage = storages.get(0); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(2, count); - storage = storages.get(1); - count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(1, count); - } - - public void testColumns() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("a"); - row.put("cf1", "cq", new Value(new byte[0])); - row.put("cf2", "cq", new Value(new byte[0])); - row.put("cf3", "cq1", new Value(new byte[0])); - row.put("cf3", "cq2", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&columns=cf1,cf3|cq1&mock=true"; - AccumuloStorage storage = createAccumuloStorage(location); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(2, count); - } - - public void testWholeRowRange() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("a"); - row.put("cf1", "cq", new Value(new byte[0])); - row.put("cf2", "cq", new Value(new byte[0])); - row.put("cf3", "cq1", new Value(new byte[0])); - row.put("cf3", "cq2", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a&mock=true"; - AccumuloStorage storage = createAccumuloStorage(location); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(4, count); - } - - public void testAuths() throws Exception { - BatchWriter batchWriter = connector.createBatchWriter(table, 10l, 10l, 2); - Mutation row = new Mutation("a"); - row.put("cf1", "cq1", new ColumnVisibility("A"), new Value(new byte[0])); - row.put("cf2", "cq2", new Value(new byte[0])); - batchWriter.addMutation(row); - batchWriter.flush(); - batchWriter.close(); - - String location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&mock=true"; - AccumuloStorage storage = createAccumuloStorage(location); - int count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(1, count); - - location = "accumulo://" + table + "?instance=" + instance + "&user=" + user + "&password=" + pwd + "&range=a|c&auths=A&mock=true"; - storage = createAccumuloStorage(location); - count = 0; - while (true) { - Tuple next = storage.getNext(); - if (next == null) - break; - assertEquals(6, next.size()); - count++; - } - assertEquals(2, count); - } - - protected AccumuloStorage createAccumuloStorage(String location) throws IOException, InterruptedException { - List<AccumuloStorage> accumuloStorages = createAccumuloStorages(location); - if (accumuloStorages.size() > 0) { - return accumuloStorages.get(0); - } - return null; - } - - protected List<AccumuloStorage> createAccumuloStorages(String location) throws IOException, InterruptedException { - List<AccumuloStorage> accumuloStorages = new ArrayList<AccumuloStorage>(); - AccumuloStorage storage = new AccumuloStorage(); - InputFormat inputFormat = storage.getInputFormat(); - Job job = new Job(new Configuration()); - storage.setLocation(location, job); - List<InputSplit> splits = inputFormat.getSplits(job); - assertNotNull(splits); - - for (InputSplit inputSplit : splits) { - storage = new AccumuloStorage(); - job = new Job(new Configuration()); - storage.setLocation(location, job); - TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration(), - new TaskAttemptID("jtid", 0, false, 0, 0)); - RecordReader recordReader = inputFormat.createRecordReader(inputSplit, - taskAttemptContext); - recordReader.initialize(inputSplit, taskAttemptContext); - - storage.prepareToRead(recordReader, null); - accumuloStorages.add(storage); - } - return accumuloStorages; - } -}
