http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java index a1c84aa..5adb893 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/tools/AccumuloRdfCountTool.java @@ -22,19 +22,9 @@ package org.apache.rya.accumulo.mr.tools; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Date; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRdfConstants; -import org.apache.rya.accumulo.mr.AbstractAccumuloMRTool; -import org.apache.rya.accumulo.mr.MRUtils; -import org.apache.rya.api.RdfCloudTripleStoreConstants; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.resolver.RyaTripleContext; -import org.apache.rya.api.resolver.triple.TripleRow; -import org.apache.rya.api.resolver.triple.TripleRowResolverException; - import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -49,6 +39,16 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRdfConstants; +import org.apache.rya.accumulo.mr.AbstractAccumuloMRTool; +import org.apache.rya.accumulo.mr.MRUtils; +import org.apache.rya.api.RdfCloudTripleStoreConstants; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.resolver.RyaTripleContext; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; @@ -64,13 +64,14 @@ import com.google.common.io.ByteStreams; * Time: 10:39:40 AM * @deprecated */ +@Deprecated public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool { - public static void main(String[] args) { + public static void main(final String[] args) { try { ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), args); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); } } @@ -80,13 +81,13 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool */ @Override - public int run(String[] strings) throws Exception { + public int run(final String[] strings) throws Exception { conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics"); //initialize init(); - Job job = new Job(conf); + final Job job = new Job(conf); job.setJarByClass(AccumuloRdfCountTool.class); setupAccumuloInput(job); @@ -102,16 +103,16 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool job.setCombinerClass(CountPiecesCombiner.class); job.setReducerClass(CountPiecesReducer.class); - String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX; + final String outputTable = MRUtils.getTablePrefix(conf) + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX; setupAccumuloOutput(job, outputTable); // Submit the job - Date startTime = new Date(); + final Date startTime = new Date(); System.out.println("Job started: " + startTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; + final int exitCode = job.waitForCompletion(true) ? 0 : 1; if (exitCode == 0) { - Date end_time = new Date(); + final Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 @@ -131,38 +132,39 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool ValueFactoryImpl vf = new ValueFactoryImpl(); - private Text keyOut = new Text(); - private LongWritable valOut = new LongWritable(1); + private final Text keyOut = new Text(); + private final LongWritable valOut = new LongWritable(1); private RyaTripleContext ryaContext; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); - Configuration conf = context.getConfiguration(); + final Configuration conf = context.getConfiguration(); tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf( conf.get(MRUtils.TABLE_LAYOUT_PROP, RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString())); ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); } @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + protected void map(final Key key, final Value value, final Context context) throws IOException, InterruptedException { try { - RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); + final RyaStatement statement = ryaContext.deserializeTriple(tableLayout, new TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); //count each piece subject, pred, object - String subj = statement.getSubject().getData(); - String pred = statement.getPredicate().getData(); + final String subj = statement.getSubject().getData(); + final String pred = statement.getPredicate().getData(); // byte[] objBytes = tripleFormat.getValueFormat().serialize(statement.getObject()); - RyaURI scontext = statement.getContext(); - boolean includesContext = scontext != null; - String scontext_str = (includesContext) ? scontext.getData() : null; + final RyaURI scontext = statement.getContext(); + final boolean includesContext = scontext != null; + final String scontext_str = (includesContext) ? scontext.getData() : null; ByteArrayDataOutput output = ByteStreams.newDataOutput(); output.writeUTF(subj); output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF); output.writeBoolean(includesContext); - if (includesContext) + if (includesContext) { output.writeUTF(scontext_str); + } keyOut.set(output.toByteArray()); context.write(keyOut, valOut); @@ -170,11 +172,12 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool output.writeUTF(pred); output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF); output.writeBoolean(includesContext); - if (includesContext) + if (includesContext) { output.writeUTF(scontext_str); + } keyOut.set(output.toByteArray()); context.write(keyOut, valOut); - } catch (TripleRowResolverException e) { + } catch (final TripleRowResolverException e) { throw new IOException(e); } } @@ -182,21 +185,22 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool public static class CountPiecesCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { - private LongWritable valOut = new LongWritable(); + private final LongWritable valOut = new LongWritable(); // TODO: can still add up to be large I guess // any count lower than this does not need to be saved public static final int TOO_LOW = 2; @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException { long count = 0; - for (LongWritable lw : values) { + for (final LongWritable lw : values) { count += lw.get(); } - if (count <= TOO_LOW) + if (count <= TOO_LOW) { return; + } valOut.set(count); context.write(key, valOut); @@ -218,38 +222,40 @@ public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements Tool private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, RdfCloudTripleStoreConstants.TBL_PRFX_DEF); table = new Text(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); final String cv_s = context.getConfiguration().get(MRUtils.AC_CV_PROP); - if (cv_s != null) + if (cv_s != null) { cv = new ColumnVisibility(cv_s); + } } @Override - protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { + protected void reduce(final Text key, final Iterable<LongWritable> values, final Context context) throws IOException, InterruptedException { long count = 0; - for (LongWritable lw : values) { + for (final LongWritable lw : values) { count += lw.get(); } - if (count <= TOO_LOW) + if (count <= TOO_LOW) { return; + } - ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); - String v = badi.readUTF(); + final ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes()); + final String v = badi.readUTF(); cat_txt.set(badi.readUTF()); Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT; - boolean includesContext = badi.readBoolean(); + final boolean includesContext = badi.readBoolean(); if (includesContext) { columnQualifier = new Text(badi.readUTF()); } row.set(v); - Mutation m = new Mutation(row); - v_out.set((count + "").getBytes()); + final Mutation m = new Mutation(row); + v_out.set((count + "").getBytes(StandardCharsets.UTF_8)); m.put(cat_txt, columnQualifier, cv, v_out); context.write(table, m); }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java ---------------------------------------------------------------------- diff --git a/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java b/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java index e570ce5..eba4b3d 100644 --- a/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java +++ b/osgi/camel.rya/src/main/java/org/apache/rya/camel/cbsail/CbSailProducer.java @@ -1,5 +1,18 @@ package org.apache.rya.camel.cbsail; +import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.CONF_INFER; +import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH; +import static org.apache.rya.camel.cbsail.CbSailComponent.SPARQL_QUERY_PROP; +import static org.apache.rya.camel.cbsail.CbSailComponent.valueFactory; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +21,9 @@ package org.apache.rya.camel.cbsail; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,28 +37,27 @@ package org.apache.rya.camel.cbsail; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.openrdf.model.Statement; -import org.openrdf.query.*; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResultHandlerBase; +import org.openrdf.query.TupleQueryResultHandlerException; import org.openrdf.query.resultio.sparqlxml.SPARQLResultsXMLWriter; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; import org.openrdf.rio.RDFHandlerException; -import java.io.ByteArrayOutputStream; -import java.util.*; - -import static org.apache.rya.api.RdfCloudTripleStoreConfiguration.*; -import static org.apache.rya.camel.cbsail.CbSailComponent.SPARQL_QUERY_PROP; -import static org.apache.rya.camel.cbsail.CbSailComponent.valueFactory; - /** */ public class CbSailProducer extends DefaultProducer { private RepositoryConnection connection; - private CbSailEndpoint.CbSailOutput queryOutput = CbSailEndpoint.CbSailOutput.BINARY; + private final CbSailEndpoint.CbSailOutput queryOutput = CbSailEndpoint.CbSailOutput.BINARY; - public CbSailProducer(CbSailEndpoint endpoint) { + public CbSailProducer(final CbSailEndpoint endpoint) { super(endpoint); } @@ -53,78 +65,83 @@ public class CbSailProducer extends DefaultProducer { public void process(final Exchange exchange) throws Exception { //If a query is set in the header or uri, use it Collection<String> queries = new ArrayList<String>(); - Collection tmp = exchange.getIn().getHeader(SPARQL_QUERY_PROP, Collection.class); + final Collection tmp = exchange.getIn().getHeader(SPARQL_QUERY_PROP, Collection.class); if (tmp != null) { queries = tmp; } else { - String query = exchange.getIn().getHeader(SPARQL_QUERY_PROP, String.class); + final String query = exchange.getIn().getHeader(SPARQL_QUERY_PROP, String.class); if (query != null) { queries.add(query); } } - if (queries.size() > 0) + if (queries.size() > 0) { sparqlQuery(exchange, queries); - else + } else { inputTriples(exchange); + } } - protected void inputTriples(Exchange exchange) throws RepositoryException { - Object body = exchange.getIn().getBody(); + protected void inputTriples(final Exchange exchange) throws RepositoryException { + final Object body = exchange.getIn().getBody(); if (body instanceof Statement) { //save statement inputStatement((Statement) body); } else if (body instanceof List) { //save list of statements - List lst = (List) body; - for (Object obj : lst) { - if (obj instanceof Statement) + final List lst = (List) body; + for (final Object obj : lst) { + if (obj instanceof Statement) { inputStatement((Statement) obj); + } } } connection.commit(); exchange.getOut().setBody(Boolean.TRUE); } - protected void inputStatement(Statement stmt) throws RepositoryException { + protected void inputStatement(final Statement stmt) throws RepositoryException { connection.add(stmt.getSubject(), stmt.getPredicate(), stmt.getObject()); } - protected void sparqlQuery(Exchange exchange, Collection<String> queries) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException, RDFHandlerException { + protected void sparqlQuery(final Exchange exchange, final Collection<String> queries) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException, RDFHandlerException { - List list = new ArrayList(); - for (String query : queries) { + final List list = new ArrayList(); + for (final String query : queries) { // Long startTime = exchange.getIn().getHeader(START_TIME_QUERY_PROP, Long.class); // Long ttl = exchange.getIn().getHeader(TTL_QUERY_PROP, Long.class); - String auth = exchange.getIn().getHeader(CONF_QUERY_AUTH, String.class); - Boolean infer = exchange.getIn().getHeader(CONF_INFER, Boolean.class); + final String auth = exchange.getIn().getHeader(CONF_QUERY_AUTH, String.class); + final Boolean infer = exchange.getIn().getHeader(CONF_INFER, Boolean.class); - Object output = performSelect(query, auth, infer); + final Object output = performSelect(query, auth, infer); if (queries.size() == 1) { exchange.getOut().setBody(output); return; - } else + } else { list.add(output); + } } exchange.getOut().setBody(list); } - protected Object performSelect(String query, String auth, Boolean infer) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException { - TupleQuery tupleQuery = connection.prepareTupleQuery( + protected Object performSelect(final String query, final String auth, final Boolean infer) throws RepositoryException, MalformedQueryException, QueryEvaluationException, TupleQueryResultHandlerException { + final TupleQuery tupleQuery = connection.prepareTupleQuery( QueryLanguage.SPARQL, query); - if (auth != null && auth.length() > 0) + if (auth != null && auth.length() > 0) { tupleQuery.setBinding(CONF_QUERY_AUTH, valueFactory.createLiteral(auth)); - if (infer != null) + } + if (infer != null) { tupleQuery.setBinding(CONF_INFER, valueFactory.createLiteral(infer)); + } if (CbSailEndpoint.CbSailOutput.BINARY.equals(queryOutput)) { final List listOutput = new ArrayList(); - TupleQueryResultHandlerBase handler = new TupleQueryResultHandlerBase() { + final TupleQueryResultHandlerBase handler = new TupleQueryResultHandlerBase() { @Override - public void handleSolution(BindingSet bindingSet) throws TupleQueryResultHandlerException { - Map<String, String> map = new HashMap<String, String>(); - for (String s : bindingSet.getBindingNames()) { + public void handleSolution(final BindingSet bindingSet) throws TupleQueryResultHandlerException { + final Map<String, String> map = new HashMap<String, String>(); + for (final String s : bindingSet.getBindingNames()) { map.put(s, bindingSet.getBinding(s).getValue().stringValue()); } listOutput.add(map); @@ -133,10 +150,10 @@ public class CbSailProducer extends DefaultProducer { tupleQuery.evaluate(handler); return listOutput; } else if (CbSailEndpoint.CbSailOutput.XML.equals(queryOutput)) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - SPARQLResultsXMLWriter sparqlWriter = new SPARQLResultsXMLWriter(baos); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final SPARQLResultsXMLWriter sparqlWriter = new SPARQLResultsXMLWriter(baos); tupleQuery.evaluate(sparqlWriter); - return new String(baos.toByteArray()); + return new String(baos.toByteArray(), StandardCharsets.UTF_8); } else { throw new IllegalArgumentException("Query Output[" + queryOutput + "] is not recognized"); } @@ -164,7 +181,7 @@ public class CbSailProducer extends DefaultProducer { @Override protected void doStart() throws Exception { - CbSailEndpoint cbSailEndpoint = (CbSailEndpoint) getEndpoint(); + final CbSailEndpoint cbSailEndpoint = (CbSailEndpoint) getEndpoint(); connection = cbSailEndpoint.getSailRepository().getConnection(); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java index ac151d9..c97c717 100644 --- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java +++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/AccumuloStorage.java @@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -34,8 +35,8 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -110,8 +111,8 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord return null; } - Key key = (Key) reader.getCurrentKey(); - Value value = (Value) reader.getCurrentValue(); + final Key key = reader.getCurrentKey(); + final Value value = reader.getCurrentValue(); assert key != null && value != null; if (logger.isTraceEnabled()) { @@ -119,7 +120,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(6); + final Tuple tuple = TupleFactory.getInstance().newTuple(6); tuple.set(0, new DataByteArray(key.getRow().getBytes())); tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); @@ -130,7 +131,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord logger.trace("Output tuple[" + tuple + "]"); } return tuple; - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IOException(e.getMessage()); } } @@ -141,12 +142,12 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } @Override - public void prepareToRead(RecordReader reader, PigSplit split) { + public void prepareToRead(final RecordReader reader, final PigSplit split) { this.reader = reader; } @Override - public void setLocation(String location, Job job) throws IOException { + public void setLocation(final String location, final Job job) throws IOException { if (logger.isDebugEnabled()) { logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]"); } @@ -155,8 +156,8 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord if (!ConfiguratorBase.isConnectorInfoSet(AccumuloInputFormat.class, conf)) { try { - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes())); - } catch (AccumuloSecurityException e) { + AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes(StandardCharsets.UTF_8))); + } catch (final AccumuloSecurityException e) { throw new RuntimeException(e); } AccumuloInputFormat.setInputTableName(job, table); @@ -167,8 +168,9 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord AccumuloInputFormat.setMockInstance(job, inst); } } - if (columnFamilyColumnQualifierPairs.size() > 0) + if (columnFamilyColumnQualifierPairs.size() > 0) { AccumuloInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs); + } logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " + "for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]"); @@ -178,24 +180,25 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord AccumuloInputFormat.setRanges(job, ranges); } - protected void setLocationFromUri(String uri, Job job) throws IOException { + protected void setLocationFromUri(final String uri, final Job job) throws IOException { // ex: accumulo://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true try { - if (!uri.startsWith("accumulo://")) + if (!uri.startsWith("accumulo://")) { throw new Exception("Bad scheme."); - String[] urlParts = uri.split("\\?"); + } + final String[] urlParts = uri.split("\\?"); setLocationFromUriParts(urlParts); - } catch (Exception e) { + } catch (final Exception e) { throw new IOException("Expected 'accumulo://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e); } } - protected void setLocationFromUriParts(String[] urlParts) { + protected void setLocationFromUriParts(final String[] urlParts) { String columns = ""; if (urlParts.length > 1) { - for (String param : urlParts[1].split("&")) { - String[] pair = param.split("="); + for (final String param : urlParts[1].split("&")) { + final String[] pair = param.split("="); if (pair[0].equals("instance")) { inst = pair[1]; } else if (pair[0].equals("user")) { @@ -209,7 +212,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } else if (pair[0].equals("columns")) { columns = pair[1]; } else if (pair[0].equals("range")) { - String[] r = pair[1].split("\\|"); + final String[] r = pair[1].split("\\|"); if (r.length == 2) { addRange(new Range(r[0], r[1])); } else { @@ -221,7 +224,7 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord addLocationFromUriPart(pair); } } - String[] parts = urlParts[0].split("/+"); + final String[] parts = urlParts[0].split("/+"); table = parts[1]; tableName = new Text(table); @@ -232,11 +235,11 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } if (!columns.equals("")) { - for (String cfCq : columns.split(",")) { + for (final String cfCq : columns.split(",")) { if (cfCq.contains("|")) { - String[] c = cfCq.split("\\|"); - String cf = c[0]; - String cq = c[1]; + final String[] c = cfCq.split("\\|"); + final String cf = c[0]; + final String cq = c[1]; addColumnPair(cf, cq); } else { addColumnPair(cfCq, null); @@ -245,50 +248,53 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } } - protected void addColumnPair(String cf, String cq) { + protected void addColumnPair(final String cf, final String cq) { columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null)); } - protected void addLocationFromUriPart(String[] pair) { + protected void addLocationFromUriPart(final String[] pair) { } - protected void addRange(Range range) { + protected void addRange(final Range range) { ranges.add(range); } @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + public String relativeToAbsolutePath(final String location, final Path curDir) throws IOException { return location; } @Override - public void setUDFContextSignature(String signature) { + public void setUDFContextSignature(final String signature) { } /* StoreFunc methods */ - public void setStoreFuncUDFContextSignature(String signature) { + @Override + public void setStoreFuncUDFContextSignature(final String signature) { } - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { + @Override + public String relToAbsPathForStoreLocation(final String location, final Path curDir) throws IOException { return relativeToAbsolutePath(location, curDir); } - public void setStoreLocation(String location, Job job) throws IOException { + @Override + public void setStoreLocation(final String location, final Job job) throws IOException { conf = job.getConfiguration(); setLocationFromUri(location, job); if (!conf.getBoolean(AccumuloOutputFormat.class.getSimpleName() + ".configured", false)) { try { - AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes())); - } catch (AccumuloSecurityException e) { + AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(password.getBytes(StandardCharsets.UTF_8))); + } catch (final AccumuloSecurityException e) { throw new RuntimeException(e); } AccumuloOutputFormat.setDefaultTableName(job, table); AccumuloOutputFormat.setZooKeeperInstance(job, inst, zookeepers); - BatchWriterConfig config = new BatchWriterConfig(); + final BatchWriterConfig config = new BatchWriterConfig(); config.setMaxLatency(10, TimeUnit.SECONDS); config.setMaxMemory(10 * 1000 * 1000); config.setMaxWriteThreads(10); @@ -296,66 +302,70 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord } } + @Override public OutputFormat getOutputFormat() { return new AccumuloOutputFormat(); } - public void checkSchema(ResourceSchema schema) throws IOException { + @Override + public void checkSchema(final ResourceSchema schema) throws IOException { // we don't care about types, they all get casted to ByteBuffers } - public void prepareToWrite(RecordWriter writer) { + @Override + public void prepareToWrite(final RecordWriter writer) { this.writer = writer; } - public void putNext(Tuple t) throws ExecException, IOException { - Mutation mut = new Mutation(objToText(t.get(0))); - Text cf = objToText(t.get(1)); - Text cq = objToText(t.get(2)); + @Override + public void putNext(final Tuple t) throws ExecException, IOException { + final Mutation mut = new Mutation(objToText(t.get(0))); + final Text cf = objToText(t.get(1)); + final Text cq = objToText(t.get(2)); if (t.size() > 4) { - Text cv = objToText(t.get(3)); - Value val = new Value(objToBytes(t.get(4))); + final Text cv = objToText(t.get(3)); + final Value val = new Value(objToBytes(t.get(4))); if (cv.getLength() == 0) { mut.put(cf, cq, val); } else { mut.put(cf, cq, new ColumnVisibility(cv), val); } } else { - Value val = new Value(objToBytes(t.get(3))); + final Value val = new Value(objToBytes(t.get(3))); mut.put(cf, cq, val); } try { writer.write(tableName, mut); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { throw new IOException(e); } } - private static Text objToText(Object o) { + private static Text objToText(final Object o) { return new Text(objToBytes(o)); } - private static byte[] objToBytes(Object o) { + private static byte[] objToBytes(final Object o) { if (o instanceof String) { - String str = (String) o; - return str.getBytes(); + final String str = (String) o; + return str.getBytes(StandardCharsets.UTF_8); } else if (o instanceof Long) { - Long l = (Long) o; - return l.toString().getBytes(); + final Long l = (Long) o; + return l.toString().getBytes(StandardCharsets.UTF_8); } else if (o instanceof Integer) { - Integer l = (Integer) o; - return l.toString().getBytes(); + final Integer l = (Integer) o; + return l.toString().getBytes(StandardCharsets.UTF_8); } else if (o instanceof Boolean) { - Boolean l = (Boolean) o; - return l.toString().getBytes(); + final Boolean l = (Boolean) o; + return l.toString().getBytes(StandardCharsets.UTF_8); } else if (o instanceof Float) { - Float l = (Float) o; - return l.toString().getBytes(); + final Float l = (Float) o; + return l.toString().getBytes(StandardCharsets.UTF_8); } else if (o instanceof Double) { - Double l = (Double) o; - return l.toString().getBytes(); + final Double l = (Double) o; + return l.toString().getBytes(StandardCharsets.UTF_8); } // TODO: handle DataBag, Map<Object, Object>, and Tuple @@ -363,19 +373,20 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord return ((DataByteArray) o).get(); } - public void cleanupOnFailure(String failure, Job job) { + @Override + public void cleanupOnFailure(final String failure, final Job job) { } @Override - public WritableComparable<?> getSplitComparable(InputSplit inputSplit) throws IOException { + public WritableComparable<?> getSplitComparable(final InputSplit inputSplit) throws IOException { //cannot get access to the range directly - AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); + final AccumuloInputFormat.RangeInputSplit rangeInputSplit = (AccumuloInputFormat.RangeInputSplit) inputSplit; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos); rangeInputSplit.write(out); out.close(); - DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); - Range range = new Range(); + final DataInputStream stream = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + final Range range = new Range(); range.readFields(stream); stream.close(); return range; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java index c1d426c..615c062 100644 --- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java +++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/IndexWritingTool.java @@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,8 +23,8 @@ package org.apache.rya.accumulo.pig; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.regex.Pattern; @@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -48,7 +47,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; @@ -56,8 +54,6 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.ProjectionElem; -import org.openrdf.query.algebra.ProjectionElemList; import org.openrdf.query.algebra.TupleExpr; import org.openrdf.query.parser.sparql.SPARQLParser; @@ -66,15 +62,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public class IndexWritingTool extends Configured implements Tool { - + private static final String sparql_key = "SPARQL.VALUE"; private static String cardCounter = "count"; - - - public static void main(String[] args) throws Exception { - + + + public static void main(final String[] args) throws Exception { + ToolRunner.run(new Configuration(), new IndexWritingTool(), args); - + } @Override @@ -90,12 +86,12 @@ public class IndexWritingTool extends Configured implements Tool { final String passStr = args[5]; final String tablePrefix = args[6]; - String sparql = FileUtils.readFileToString(new File(sparqlFile)); + final String sparql = FileUtils.readFileToString(new File(sparqlFile)); - Job job = new Job(getConf(), "Write HDFS Index to Accumulo"); + final Job job = new Job(getConf(), "Write HDFS Index to Accumulo"); job.setJarByClass(this.getClass()); - Configuration jobConf = job.getConfiguration(); + final Configuration jobConf = job.getConfiguration(); jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); setVarOrders(sparql, jobConf); @@ -120,29 +116,29 @@ public class IndexWritingTool extends Configured implements Tool { setAccumuloOutput(instStr, zooStr, userStr, passStr, job, tableName); jobConf.set(sparql_key, sparql); - - int complete = job.waitForCompletion(true) ? 0 : -1; + + final int complete = job.waitForCompletion(true) ? 0 : -1; if (complete == 0) { - - String[] varOrders = jobConf.getStrings("varOrders"); - String orders = Joiner.on("\u0000").join(varOrders); + + final String[] varOrders = jobConf.getStrings("varOrders"); + final String orders = Joiner.on("\u0000").join(varOrders); Instance inst; - + if (zooStr.equals("mock")) { inst = new MockInstance(instStr); } else { inst = new ZooKeeperInstance(instStr, zooStr); } - - Connector conn = inst.getConnector(userStr, passStr.getBytes()); - BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1); - - Counters counters = job.getCounters(); - Counter c1 = counters.findCounter(cardCounter, cardCounter); - - Mutation m = new Mutation("~SPARQL"); - Value v = new Value(sparql.getBytes()); + + final Connector conn = inst.getConnector(userStr, passStr.getBytes(StandardCharsets.UTF_8)); + final BatchWriter bw = conn.createBatchWriter(tableName, 10, 5000, 1); + + final Counters counters = job.getCounters(); + final Counter c1 = counters.findCounter(cardCounter, cardCounter); + + final Mutation m = new Mutation("~SPARQL"); + final Value v = new Value(sparql.getBytes(StandardCharsets.UTF_8)); m.put(new Text("" + c1.getValue()), new Text(orders), v); bw.addMutation(m); @@ -155,52 +151,52 @@ public class IndexWritingTool extends Configured implements Tool { } - - - public void setVarOrders(String s, Configuration conf) throws MalformedQueryException { - SPARQLParser parser = new SPARQLParser(); - TupleExpr query = parser.parseQuery(s, null).getTupleExpr(); - List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames()); - String projElems = Joiner.on(";").join(projList); + public void setVarOrders(final String s, final Configuration conf) throws MalformedQueryException { + + final SPARQLParser parser = new SPARQLParser(); + final TupleExpr query = parser.parseQuery(s, null).getTupleExpr(); + + final List<String> projList = Lists.newArrayList(((Projection) query).getProjectionElemList().getTargetNames()); + final String projElems = Joiner.on(";").join(projList); conf.set("projElems", projElems); - Pattern splitPattern1 = Pattern.compile("\n"); - Pattern splitPattern2 = Pattern.compile(","); - String[] lines = splitPattern1.split(s); + final Pattern splitPattern1 = Pattern.compile("\n"); + final Pattern splitPattern2 = Pattern.compile(","); + final String[] lines = splitPattern1.split(s); - List<String> varOrders = Lists.newArrayList(); - List<String> varOrderPos = Lists.newArrayList(); + final List<String> varOrders = Lists.newArrayList(); + final List<String> varOrderPos = Lists.newArrayList(); int orderNum = 0; - int projSizeSq = projList.size()*projList.size(); - + final int projSizeSq = projList.size()*projList.size(); + for (String t : lines) { if(orderNum > projSizeSq){ break; } - + String[] order = null; if (t.startsWith("#prefix")) { t = t.substring(7).trim(); order = splitPattern2.split(t, projList.size()); } - + String tempVarOrder = ""; String tempVarOrderPos = ""; if (order != null) { - for (String u : order) { + for (final String u : order) { if (tempVarOrder.length() == 0) { tempVarOrder = u.trim(); } else { tempVarOrder = tempVarOrder + ";" + u.trim(); } - int pos = projList.indexOf(u.trim()); + final int pos = projList.indexOf(u.trim()); if (pos < 0) { throw new IllegalArgumentException("Invalid variable order!"); } else { @@ -215,17 +211,17 @@ public class IndexWritingTool extends Configured implements Tool { varOrders.add(tempVarOrder); varOrderPos.add(tempVarOrderPos); } - + if(tempVarOrder.length() > 0) { orderNum++; } } - + if(orderNum == 0) { varOrders.add(projElems); String tempVarPos = ""; - + for(int i = 0; i < projList.size(); i++) { if(i == 0) { tempVarPos = Integer.toString(0); @@ -234,29 +230,29 @@ public class IndexWritingTool extends Configured implements Tool { } } varOrderPos.add(tempVarPos); - + } - - String[] vOrders = varOrders.toArray(new String[varOrders.size()]); - String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]); - - - + + final String[] vOrders = varOrders.toArray(new String[varOrders.size()]); + final String[] vOrderPos = varOrderPos.toArray(new String[varOrderPos.size()]); + + + conf.setStrings("varOrders", vOrders); conf.setStrings("varOrderPos", vOrderPos); } - - private static void setAccumuloOutput(String instStr, String zooStr, String userStr, String passStr, Job job, String tableName) + + private static void setAccumuloOutput(final String instStr, final String zooStr, final String userStr, final String passStr, final Job job, final String tableName) throws AccumuloSecurityException { - AuthenticationToken token = new PasswordToken(passStr); + final AuthenticationToken token = new PasswordToken(passStr); AccumuloOutputFormat.setConnectorInfo(job, userStr, token); AccumuloOutputFormat.setDefaultTableName(job, tableName); AccumuloOutputFormat.setCreateTables(job, true); //TODO best way to do this? - + if (zooStr.equals("mock")) { AccumuloOutputFormat.setMockInstance(job, instStr); } else { @@ -270,41 +266,41 @@ public class IndexWritingTool extends Configured implements Tool { } public static class MyMapper extends Mapper<LongWritable, Text, Text, Mutation> { - + private static final Logger logger = Logger.getLogger(MyMapper.class); final static Text EMPTY_TEXT = new Text(); final static Value EMPTY_VALUE = new Value(new byte[] {}); private String[] varOrderPos = null; private String[] projElem = null; private Pattern splitPattern = null; - private List<List<Integer>> varPositions = Lists.newArrayList(); - - + private final List<List<Integer>> varPositions = Lists.newArrayList(); + + @Override - protected void setup(Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException, + protected void setup(final Mapper<LongWritable, Text, Text, Mutation>.Context context) throws IOException, InterruptedException { - - Configuration conf = context.getConfiguration(); - + + final Configuration conf = context.getConfiguration(); + varOrderPos = conf.getStrings("varOrderPos"); splitPattern = Pattern.compile("\t"); - - for (String s : varOrderPos) { - String[] pos = s.split(";"); - List<Integer> intPos = Lists.newArrayList(); + + for (final String s : varOrderPos) { + final String[] pos = s.split(";"); + final List<Integer> intPos = Lists.newArrayList(); int i = 0; - for(String t: pos) { + for(final String t: pos) { i = Integer.parseInt(t); intPos.add(i); } - + varPositions.add(intPos); - + } - + projElem = conf.get("projElems").split(";"); - + super.setup(context); } @@ -314,17 +310,17 @@ public class IndexWritingTool extends Configured implements Tool { @Override - public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { + public void map(final LongWritable key, final Text value, final Context output) throws IOException, InterruptedException { + + final String[] result = splitPattern.split(value.toString()); - String[] result = splitPattern.split(value.toString()); - - for (List<Integer> list : varPositions) { + for (final List<Integer> list : varPositions) { String values = ""; String vars = ""; - for (Integer i : list) { + for (final Integer i : list) { if (values.length() == 0) { values = result[i]; @@ -335,7 +331,7 @@ public class IndexWritingTool extends Configured implements Tool { } } - Mutation m = new Mutation(new Text(values)); + final Mutation m = new Mutation(new Text(values)); m.put(new Text(vars), EMPTY_TEXT, EMPTY_VALUE); output.write(EMPTY_TEXT, m); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java index 782840c..5c2c52c 100644 --- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java +++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/SparqlQueryPigEngine.java @@ -1,5 +1,31 @@ package org.apache.rya.accumulo.pig; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRdfEvalStatsDAO; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.rdftriplestore.evaluation.QueryJoinOptimizer; +import org.apache.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics; +import org.apache.rya.rdftriplestore.inference.InferenceEngine; +import org.apache.rya.rdftriplestore.inference.InverseOfVisitor; +import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor; +import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.QueryParser; +import org.openrdf.query.parser.sparql.SPARQLParser; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +34,9 @@ package org.apache.rya.accumulo.pig; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,31 +48,6 @@ package org.apache.rya.accumulo.pig; import com.google.common.base.Preconditions; -import com.google.common.io.ByteStreams; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRdfEvalStatsDAO; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer; -import org.apache.rya.rdftriplestore.evaluation.QueryJoinOptimizer; -import org.apache.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics; -import org.apache.rya.rdftriplestore.inference.InferenceEngine; -import org.apache.rya.rdftriplestore.inference.InverseOfVisitor; -import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor; -import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.openrdf.query.algebra.QueryRoot; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; -import java.io.IOException; /** * Created by IntelliJ IDEA. @@ -74,7 +75,7 @@ public class SparqlQueryPigEngine { return conf; } - public void setConf(AccumuloRdfConfiguration conf) { + public void setConf(final AccumuloRdfConfiguration conf) { this.conf = conf; } @@ -92,14 +93,14 @@ public class SparqlQueryPigEngine { } if (inference || stats) { - String instance = sparqlToPigTransformVisitor.getInstance(); - String zoo = sparqlToPigTransformVisitor.getZk(); - String user = sparqlToPigTransformVisitor.getUser(); - String pass = sparqlToPigTransformVisitor.getPassword(); + final String instance = sparqlToPigTransformVisitor.getInstance(); + final String zoo = sparqlToPigTransformVisitor.getZk(); + final String user = sparqlToPigTransformVisitor.getUser(); + final String pass = sparqlToPigTransformVisitor.getPassword(); - Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes()); + final Connector connector = new ZooKeeperInstance(instance, zoo).getConnector(user, pass.getBytes(StandardCharsets.UTF_8)); - String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix(); + final String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix(); conf.setTablePrefix(tablePrefix); if (inference) { logger.info("Using inference"); @@ -147,28 +148,28 @@ public class SparqlQueryPigEngine { * @param hdfsSaveLocation to save the execution * @throws java.io.IOException */ - public void runQuery(String sparql, String hdfsSaveLocation) throws IOException { + public void runQuery(final String sparql, final String hdfsSaveLocation) throws IOException { Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null"); logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]"); pigServer.deleteFile(hdfsSaveLocation); try { - String pigScript = generatePigScript(sparql); + final String pigScript = generatePigScript(sparql); if (logger.isDebugEnabled()) { logger.debug("Pig script [" + pigScript + "]"); } - pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes())); + pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes(StandardCharsets.UTF_8))); pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant - } catch (Exception e) { + } catch (final Exception e) { throw new IOException(e); } } - public String generatePigScript(String sparql) throws Exception { + public String generatePigScript(final String sparql) throws Exception { Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); - QueryParser parser = new SPARQLParser(); - ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr()); + final QueryParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + final QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr()); // SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer(); // similarVarJoinOptimizer.optimize(tupleExpr, null, null); @@ -189,31 +190,31 @@ public class SparqlQueryPigEngine { } - public static void main(String[] args) { + public static void main(final String[] args) { try { Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " + "Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf org.apache.rya.accumulo.pig.SparqlQueryPigEngine " + "tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_"); - String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0]))); - String hdfsSaveLocation = args[1]; - SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor(); + final String sparql = FileUtils.readFileToString(new File(args[0]), StandardCharsets.UTF_8); + final String hdfsSaveLocation = args[1]; + final SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor(); visitor.setTablePrefix(args[6]); visitor.setInstance(args[2]); visitor.setZk(args[3]); visitor.setUser(args[4]); visitor.setPassword(args[5]); - SparqlQueryPigEngine engine = new SparqlQueryPigEngine(); + final SparqlQueryPigEngine engine = new SparqlQueryPigEngine(); engine.setSparqlToPigTransformVisitor(visitor); engine.setInference(false); engine.setStats(false); - + engine.init(); engine.runQuery(sparql, hdfsSaveLocation); engine.destroy(); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); } } @@ -222,7 +223,7 @@ public class SparqlQueryPigEngine { return hadoopDir; } - public void setHadoopDir(String hadoopDir) { + public void setHadoopDir(final String hadoopDir) { this.hadoopDir = hadoopDir; } @@ -230,7 +231,7 @@ public class SparqlQueryPigEngine { return pigServer; } - public void setPigServer(PigServer pigServer) { + public void setPigServer(final PigServer pigServer) { this.pigServer = pigServer; } @@ -238,7 +239,7 @@ public class SparqlQueryPigEngine { return execType; } - public void setExecType(ExecType execType) { + public void setExecType(final ExecType execType) { this.execType = execType; } @@ -246,7 +247,7 @@ public class SparqlQueryPigEngine { return inference; } - public void setInference(boolean inference) { + public void setInference(final boolean inference) { this.inference = inference; } @@ -254,7 +255,7 @@ public class SparqlQueryPigEngine { return stats; } - public void setStats(boolean stats) { + public void setStats(final boolean stats) { this.stats = stats; } @@ -262,7 +263,7 @@ public class SparqlQueryPigEngine { return sparqlToPigTransformVisitor; } - public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) { + public void setSparqlToPigTransformVisitor(final SparqlToPigTransformVisitor sparqlToPigTransformVisitor) { this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java index 974888b..93266df 100644 --- a/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java +++ b/pig/accumulo.pig/src/main/java/org/apache/rya/accumulo/pig/StatementPatternStorage.java @@ -8,9 +8,9 @@ package org.apache.rya.accumulo.pig; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,21 @@ package org.apache.rya.accumulo.pig; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; @@ -41,17 +52,6 @@ import org.apache.rya.api.resolver.RyaTripleContext; import org.apache.rya.api.resolver.triple.TripleRow; import org.apache.rya.rdftriplestore.inference.InferenceEngine; import org.apache.rya.rdftriplestore.inference.InferenceEngineException; - -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.openrdf.model.Resource; import org.openrdf.model.URI; import org.openrdf.model.Value; @@ -94,7 +94,7 @@ public class StatementPatternStorage extends AccumuloStorage { else { ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); } - + } private Value getValue(Var subjectVar) { @@ -115,8 +115,9 @@ public class StatementPatternStorage extends AccumuloStorage { addInferredRanges(table, job); } - if (layout == null || ranges.size() == 0) + if (layout == null || ranges.size() == 0) { throw new IllegalArgumentException("Range and/or layout is null. Check the query"); + } table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table); tableName = new Text(table); } @@ -195,8 +196,9 @@ public class StatementPatternStorage extends AccumuloStorage { RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v); RyaType object_rya = RdfToRyaConversions.convertValue(o_v); TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null); - if (strategy == null) + if (strategy == null) { return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range()); + } Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null); ByteRange byteRange = entry.getValue(); return new RdfCloudTripleStoreUtils.CustomEntry<org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>( @@ -215,9 +217,9 @@ public class StatementPatternStorage extends AccumuloStorage { ryaDAO.setConf(rdfConf); try { if (!mock) { - ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes())); + ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes(StandardCharsets.UTF_8))); } else { - ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes())); + ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes(StandardCharsets.UTF_8))); } } catch (Exception e) { throw new IOException(e); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3dc7c68..a6415ad 100644 --- a/pom.xml +++ b/pom.xml @@ -136,7 +136,6 @@ under the License. <jsr305.version>1.3.9-1</jsr305.version> <jcip.version>1.0-1</jcip.version> - <findbugs.plugin.version>3.0.4</findbugs.plugin.version> <kafka.version>0.10.0.1</kafka.version> <jopt-simple.version>4.9</jopt-simple.version> @@ -682,7 +681,6 @@ under the License. <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.mrunit</groupId> @@ -819,7 +817,6 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <configuration> - <argLine>-Dfile.encoding=${project.build.sourceEncoding}</argLine> <systemPropertyVariables> <java.io.tmpdir>${project.build.directory}</java.io.tmpdir> </systemPropertyVariables> @@ -957,6 +954,15 @@ under the License. <artifactId>license-maven-plugin</artifactId> <version>3.0</version> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.5</version> + <configuration> + <effort>Max</effort> + <threshold>Low</threshold> + </configuration> + </plugin> </plugins> </pluginManagement> @@ -1030,6 +1036,23 @@ under the License. </excludes> </configuration> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <failOnError>true</failOnError> <!-- These are serious defects that aren't allowed in Rya. Fail the build. --> + <visitors>DefaultEncodingDetector</visitors> <!-- Only specify detectors that should not detect any errors. --> + </configuration> + <executions> + <execution> + <id>analyze-compile</id> + <phase>compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> @@ -1038,7 +1061,6 @@ under the License. <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>findbugs-maven-plugin</artifactId> - <version>${findbugs.plugin.version}</version> </plugin> </plugins> </reporting> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java ---------------------------------------------------------------------- diff --git a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java index bf655ce..921acaa 100644 --- a/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java +++ b/sail/src/main/java/org/apache/rya/rdftriplestore/RdfCloudTripleStoreConnection.java @@ -1,5 +1,3 @@ -package org.apache.rya.rdftriplestore; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,13 +16,13 @@ package org.apache.rya.rdftriplestore; * specific language governing permissions and limitations * under the License. */ - - +package org.apache.rya.rdftriplestore; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.lang.reflect.Constructor; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -156,7 +154,7 @@ public class RdfCloudTripleStoreConnection extends SailConnectionBase { final Value object, final Resource... contexts) throws SailException { try { final String cv_s = conf.getCv(); - final byte[] cv = cv_s == null ? null : cv_s.getBytes(); + final byte[] cv = cv_s == null ? null : cv_s.getBytes(StandardCharsets.UTF_8); final List<RyaStatement> ryaStatements = new ArrayList<>(); if (contexts != null && contexts.length > 0) { for (final Resource context : contexts) {