http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java index 50180ad..129bd6d 100644 --- a/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java +++ b/extras/periodic.notification/api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java @@ -19,6 +19,7 @@ package org.apache.rya.periodic.notification.serialization; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; @@ -43,47 +44,47 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin private static final Logger log = Logger.getLogger(BindingSetSerDe.class); private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); - private static final byte[] DELIM_BYTE = "\u0002".getBytes(); - - private byte[] toBytes(BindingSet bindingSet) { + private static final byte[] DELIM_BYTE = "\u0002".getBytes(StandardCharsets.UTF_8); + + private byte[] toBytes(final BindingSet bindingSet) { try { return getBytes(getVarOrder(bindingSet), bindingSet); - } catch(Exception e) { + } catch(final Exception e) { log.trace("Unable to serialize BindingSet: " + bindingSet); return new byte[0]; } } - private BindingSet fromBytes(byte[] bsBytes) { + private BindingSet fromBytes(final byte[] bsBytes) { try{ - int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); - byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); - byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); - VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); - return getBindingSet(varOrder, bsBytesNoVarOrder); - } catch(Exception e) { + final int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); + final byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); + final byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); + final VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); + return getBindingSet(varOrder, bsBytesNoVarOrder); + } catch(final Exception e) { log.trace("Unable to deserialize BindingSet: " + bsBytes); return new QueryBindingSet(); } } - - private VariableOrder getVarOrder(BindingSet bs) { + + private VariableOrder getVarOrder(final BindingSet bs) { return new VariableOrder(bs.getBindingNames()); } - - private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { - byte[] bsBytes = serializer.convert(bs, varOrder); - String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); - byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); + + private byte[] getBytes(final VariableOrder varOrder, final BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { + final byte[] bsBytes = serializer.convert(bs, varOrder); + final String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); + final byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); } - - private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException { + + private BindingSet getBindingSet(final VariableOrder varOrder, final byte[] bsBytes) throws BindingSetConversionException { return serializer.convert(bsBytes, varOrder); } @Override - public BindingSet deserialize(String topic, byte[] bytes) { + public BindingSet deserialize(final String topic, final byte[] bytes) { return fromBytes(bytes); } @@ -93,12 +94,12 @@ public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<Bin } @Override - public void configure(Map<String, ?> arg0, boolean arg1) { + public void configure(final Map<String, ?> arg0, final boolean arg1) { // Do nothing. Nothing to configure. } @Override - public byte[] serialize(String topic, BindingSet bs) { + public byte[] serialize(final String topic, final BindingSet bs) { return toBytes(bs); }
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java index 115074c..1073b6e 100644 --- a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java @@ -19,6 +19,7 @@ package org.apache.rya.export.accumulo.util; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -336,7 +337,7 @@ public final class AccumuloRyaUtils { public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException { final List<String> authList = new ArrayList<>(); for (final byte[] authBytes : auths.getAuthorizations()) { - final String auth = new String(authBytes); + final String auth = new String(authBytes, StandardCharsets.UTF_8); authList.add(auth); } return addUserAuths(user, secOps, authList.toArray(new String[0])); @@ -358,7 +359,7 @@ public final class AccumuloRyaUtils { authList.add(currentAuth); } for (final String newAuth : auths) { - authList.add(newAuth.getBytes()); + authList.add(newAuth.getBytes(StandardCharsets.UTF_8)); } final Authorizations result = new Authorizations(authList); return result; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java index f3d523f..fd1bc4d 100644 --- a/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java +++ b/extras/rya.export/export.client/src/main/java/org/apache/rya/export/client/merge/VisibilityStatementMerger.java @@ -18,6 +18,8 @@ */ package org.apache.rya.export.client.merge; +import java.nio.charset.StandardCharsets; + import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.export.api.MergerException; import org.apache.rya.export.api.StatementMerger; @@ -41,8 +43,8 @@ public class VisibilityStatementMerger implements StatementMerger { final RyaStatement parentStatement = parent.get(); if(child.isPresent()) { final RyaStatement childStatement = child.get(); - final String pVis = new String(parentStatement.getColumnVisibility()); - final String cVis = new String(childStatement.getColumnVisibility()); + final String pVis = new String(parentStatement.getColumnVisibility(), StandardCharsets.UTF_8); + final String cVis = new String(childStatement.getColumnVisibility(), StandardCharsets.UTF_8); String visibility = ""; final Joiner join = Joiner.on(")&("); if(pVis.isEmpty() || cVis.isEmpty()) { @@ -50,7 +52,7 @@ public class VisibilityStatementMerger implements StatementMerger { } else { visibility = "(" + join.join(pVis, cVis) + ")"; } - parentStatement.setColumnVisibility(visibility.getBytes()); + parentStatement.setColumnVisibility(visibility.getBytes(StandardCharsets.UTF_8)); return Optional.of(parentStatement); } return parent; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java index 4597400..59b92ba 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java @@ -22,6 +22,7 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.text.ParseException; import java.util.ArrayList; @@ -81,9 +82,6 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; - -import com.google.common.base.Joiner; - import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.mr.AccumuloHDFSFileInputFormat; import org.apache.rya.accumulo.mr.MRUtils; @@ -105,6 +103,8 @@ import org.apache.rya.api.RdfCloudTripleStoreUtils; import org.apache.rya.api.layout.TablePrefixLayoutStrategy; import org.apache.rya.indexing.accumulo.ConfigUtils; +import com.google.common.base.Joiner; + /** * Handles copying data from a parent instance into a child instance. */ @@ -589,9 +589,9 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { final Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt"); final Collection<Text> splits = parentTableOperations.listSplits(parentTableName, 100); log.info("Creating splits file at: " + splitsPath); - try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)))) { + try (PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(splitsPath)), false, StandardCharsets.UTF_8.name())) { for (final Text split : splits) { - final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split))); + final String encoded = new String(Base64.encodeBase64(TextUtil.getBytes(split)), StandardCharsets.UTF_8); out.println(encoded); } } @@ -873,12 +873,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } log.info("Starting Copy Tool"); - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(final Thread thread, final Throwable throwable) { - log.error("Uncaught exception in " + thread.getName(), throwable); - } - }); + Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> log.error("Uncaught exception in " + thread.getName(), throwable)); final CopyTool copyTool = new CopyTool(); final int returnCode = copyTool.setupAndRun(args); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java index e702e64..5a3b928 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java @@ -19,6 +19,7 @@ package org.apache.rya.accumulo.mr.merge.util; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -49,13 +50,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; -import org.openrdf.model.Literal; -import org.openrdf.model.ValueFactory; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; - -import info.aduna.iteration.CloseableIteration; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.accumulo.mr.MRUtils; @@ -66,6 +60,13 @@ import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.openrdf.model.Literal; +import org.openrdf.model.ValueFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; + +import info.aduna.iteration.CloseableIteration; /** * Utility methods for an Accumulo Rya instance. @@ -515,7 +516,7 @@ public final class AccumuloRyaUtils { public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException { final List<String> authList = new ArrayList<>(); for (final byte[] authBytes : auths.getAuthorizations()) { - final String auth = new String(authBytes); + final String auth = new String(authBytes, StandardCharsets.UTF_8); authList.add(auth); } return addUserAuths(user, secOps, authList.toArray(new String[0])); @@ -537,7 +538,7 @@ public final class AccumuloRyaUtils { authList.add(currentAuth); } for (final String newAuth : auths) { - authList.add(newAuth.getBytes()); + authList.add(newAuth.getBytes(StandardCharsets.UTF_8)); } final Authorizations result = new Authorizations(authList); return result; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java index 9627c54..42109db 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/util/QueryRuleset.java @@ -18,15 +18,32 @@ */ package org.apache.rya.accumulo.mr.merge.util; -import java.io.BufferedReader; -import java.io.FileReader; +import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.mr.merge.CopyTool; +import org.apache.rya.accumulo.mr.merge.util.QueryRuleset.QueryRulesetException; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.inference.InferJoin; +import org.apache.rya.rdftriplestore.inference.InferUnion; +import org.apache.rya.rdftriplestore.inference.InferenceEngine; +import org.apache.rya.rdftriplestore.inference.InverseOfVisitor; +import org.apache.rya.rdftriplestore.inference.SameAsVisitor; +import org.apache.rya.rdftriplestore.inference.SubClassOfVisitor; +import org.apache.rya.rdftriplestore.inference.SubPropertyOfVisitor; +import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor; +import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor; +import org.apache.rya.rdftriplestore.utils.FixedStatementPattern; +import org.apache.rya.rdftriplestore.utils.TransitivePropertySP; +import org.apache.rya.sail.config.RyaSailFactory; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; @@ -52,22 +69,6 @@ import org.openrdf.query.parser.ParsedTupleQuery; import org.openrdf.query.parser.QueryParserUtil; import org.openrdf.sail.SailException; -import org.apache.rya.accumulo.mr.merge.CopyTool; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.rdftriplestore.RdfCloudTripleStore; -import org.apache.rya.rdftriplestore.inference.InferJoin; -import org.apache.rya.rdftriplestore.inference.InferUnion; -import org.apache.rya.rdftriplestore.inference.InferenceEngine; -import org.apache.rya.rdftriplestore.inference.InverseOfVisitor; -import org.apache.rya.rdftriplestore.inference.SameAsVisitor; -import org.apache.rya.rdftriplestore.inference.SubClassOfVisitor; -import org.apache.rya.rdftriplestore.inference.SubPropertyOfVisitor; -import org.apache.rya.rdftriplestore.inference.SymmetricPropertyVisitor; -import org.apache.rya.rdftriplestore.inference.TransitivePropertyVisitor; -import org.apache.rya.rdftriplestore.utils.FixedStatementPattern; -import org.apache.rya.rdftriplestore.utils.TransitivePropertySP; -import org.apache.rya.sail.config.RyaSailFactory; - /** * Represents a set of {@link CopyRule} instances derived from a query. The ruleset determines a logical * subset of statements in Rya, such that statements selected by the ruleset are at least enough to answer @@ -432,16 +433,7 @@ public class QueryRuleset { final String queryFile = conf.get(CopyTool.QUERY_FILE_PROP); if (query == null && queryFile != null) { try { - final FileReader fileReader = new FileReader(queryFile); - final BufferedReader reader = new BufferedReader(fileReader); - final StringBuilder builder = new StringBuilder(); - String line = reader.readLine(); - while (line != null) { - builder.append(line).append("\n"); - line = reader.readLine(); - } - query = builder.toString(); - reader.close(); + query = FileUtils.readFileToString(new File(queryFile), StandardCharsets.UTF_8); conf.set(CopyTool.QUERY_STRING_PROP, query); } catch (final IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 4070849..3fea6ed 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -18,6 +18,7 @@ */ package org.apache.rya.indexing.pcj.fluo.demo; +import java.nio.charset.StandardCharsets; import java.util.Set; import org.apache.accumulo.core.client.Connector; @@ -96,7 +97,7 @@ public class FluoAndHistoricPcjsDemo implements Demo { /** * Used to pause the demo waiting for the presenter to hit the Enter key. */ - private final java.util.Scanner keyboard = new java.util.Scanner(System.in); + private final java.util.Scanner keyboard = new java.util.Scanner(System.in, StandardCharsets.UTF_8.name()); @Override public void execute( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java b/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java index 105e852..f630df0 100644 --- a/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java +++ b/extras/rya.prospector/src/main/java/org/apache/rya/joinselect/mr/JoinSelectProspectOutput.java @@ -1,5 +1,3 @@ -package org.apache.rya.joinselect.mr; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +6,9 @@ package org.apache.rya.joinselect.mr; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,21 +16,16 @@ package org.apache.rya.joinselect.mr; * specific language governing permissions and limitations * under the License. */ - - +package org.apache.rya.joinselect.mr; import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.AUTHS; import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_OUTPUTPATH; import static org.apache.rya.joinselect.mr.utils.JoinSelectConstants.PROSPECTS_TABLE; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; -import org.apache.rya.joinselect.mr.utils.CardinalityType; -import org.apache.rya.joinselect.mr.utils.CompositeType; -import org.apache.rya.joinselect.mr.utils.JoinSelectStatsUtil; -import org.apache.rya.joinselect.mr.utils.TripleCard; - import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -45,6 +38,10 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Tool; +import org.apache.rya.joinselect.mr.utils.CardinalityType; +import org.apache.rya.joinselect.mr.utils.CompositeType; +import org.apache.rya.joinselect.mr.utils.JoinSelectStatsUtil; +import org.apache.rya.joinselect.mr.utils.TripleCard; public class JoinSelectProspectOutput extends Configured implements Tool { @@ -55,21 +52,22 @@ public class JoinSelectProspectOutput extends Configured implements Tool { Text inText = new Text(); Pattern splitPattern = Pattern.compile(DELIM); - public void map(Key key, Value data, Context context) throws IOException, InterruptedException { + @Override + public void map(final Key key, final Value data, final Context context) throws IOException, InterruptedException { key.getRow(inText); - String[] cardData = splitPattern.split(inText.toString().trim(), 4); + final String[] cardData = splitPattern.split(inText.toString().trim(), 4); // System.out.println("Card data is " + cardData[0] + ", "+ cardData[1] + ", "+ cardData[2]); if (cardData.length == 3 && ((cardData[0].equals("subject")) || (cardData[0].equals("object")) || (cardData[0].equals("predicate")))) { - Text tripleValType = new Text(cardData[0]); - Text cardKey = new Text(cardData[1]); - LongWritable ts = new LongWritable(Long.valueOf(cardData[2])); + final Text tripleValType = new Text(cardData[0]); + final Text cardKey = new Text(cardData[1]); + final LongWritable ts = new LongWritable(Long.valueOf(cardData[2])); - String s = new String(data.get()); - LongWritable card = new LongWritable(Long.parseLong(s)); + final String s = new String(data.get(), StandardCharsets.UTF_8); + final LongWritable card = new LongWritable(Long.parseLong(s)); - CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); - TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); + final CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); + final TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); @@ -77,15 +75,15 @@ public class JoinSelectProspectOutput extends Configured implements Tool { } else if (cardData.length == 4 && ((cardData[0].equals("subjectpredicate")) || (cardData[0].equals("subjectobject")) || (cardData[0].equals("predicateobject")))) { - Text tripleValType = new Text(cardData[0]); - Text cardKey = new Text(cardData[1] + DELIM + cardData[2]); - LongWritable ts = new LongWritable(Long.valueOf(cardData[3])); + final Text tripleValType = new Text(cardData[0]); + final Text cardKey = new Text(cardData[1] + DELIM + cardData[2]); + final LongWritable ts = new LongWritable(Long.valueOf(cardData[3])); - String s = new String(data.get()); - LongWritable card = new LongWritable(Long.parseLong(s)); + final String s = new String(data.get(), StandardCharsets.UTF_8); + final LongWritable card = new LongWritable(Long.parseLong(s)); - CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); - TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); + final CompositeType cType = new CompositeType(cardKey, new IntWritable(1)); + final TripleCard tCard = new TripleCard(new CardinalityType(card, tripleValType, ts)); context.write(new CompositeType(cardKey, new IntWritable(1)), new TripleCard(new CardinalityType(card, tripleValType, ts))); // System.out.println("Card mapper output key is " + cType + " and value is " + tCard ); @@ -97,16 +95,16 @@ public class JoinSelectProspectOutput extends Configured implements Tool { } @Override - public int run(String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException { + public int run(final String[] args) throws AccumuloSecurityException, IOException, ClassNotFoundException, InterruptedException { - Configuration conf = getConf(); - String inTable = conf.get(PROSPECTS_TABLE); - String auths = conf.get(AUTHS); - String outPath = conf.get(PROSPECTS_OUTPUTPATH); + final Configuration conf = getConf(); + final String inTable = conf.get(PROSPECTS_TABLE); + final String auths = conf.get(AUTHS); + final String outPath = conf.get(PROSPECTS_OUTPUTPATH); assert inTable != null && outPath != null; - Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); + final Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis()); job.setJarByClass(this.getClass()); conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java index ebcf6c3..f408b7d 100644 --- a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java +++ b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/plans/impl/CountPlan.java @@ -21,6 +21,7 @@ package org.apache.rya.prospector.plans.impl; import static org.apache.rya.prospector.utils.ProspectorConstants.COUNT; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -62,7 +63,7 @@ import org.openrdf.model.vocabulary.XMLSchema; public class CountPlan implements IndexWorkPlan { @Override - public Collection<Map.Entry<IntermediateProspect, LongWritable>> map(RyaStatement ryaStatement) { + public Collection<Map.Entry<IntermediateProspect, LongWritable>> map(final RyaStatement ryaStatement) { final RyaURI subject = ryaStatement.getSubject(); final RyaURI predicate = ryaStatement.getPredicate(); final String subjpred = ryaStatement.getSubject().getData() + DELIM + ryaStatement.getPredicate().getData(); @@ -71,7 +72,7 @@ public class CountPlan implements IndexWorkPlan { final RyaType object = ryaStatement.getObject(); final int localIndex = URIUtil.getLocalNameIndex(subject.getData()); final String namespace = subject.getData().substring(0, localIndex - 1); - final String visibility = new String(ryaStatement.getColumnVisibility()); + final String visibility = new String(ryaStatement.getColumnVisibility(), StandardCharsets.UTF_8); final List<Map.Entry<IntermediateProspect, LongWritable>> entries = new ArrayList<>(7); @@ -149,7 +150,7 @@ public class CountPlan implements IndexWorkPlan { } @Override - public Collection<Map.Entry<IntermediateProspect, LongWritable>> combine(IntermediateProspect prospect, Iterable<LongWritable> counts) { + public Collection<Map.Entry<IntermediateProspect, LongWritable>> combine(final IntermediateProspect prospect, final Iterable<LongWritable> counts) { long sum = 0; for(final LongWritable count : counts) { sum += count.get(); @@ -158,7 +159,7 @@ public class CountPlan implements IndexWorkPlan { } @Override - public void reduce(IntermediateProspect prospect, Iterable<LongWritable> counts, Date timestamp, Reducer.Context context) throws IOException, InterruptedException { + public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException { long sum = 0; for(final LongWritable count : counts) { sum += count.get(); @@ -172,7 +173,7 @@ public class CountPlan implements IndexWorkPlan { final String dataType = prospect.getDataType(); final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility()); - final Value sumValue = new Value(("" + sum).getBytes()); + final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8)); m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue); context.write(null, m); @@ -185,7 +186,7 @@ public class CountPlan implements IndexWorkPlan { } @Override - public String getCompositeValue(List<String> indices){ + public String getCompositeValue(final List<String> indices){ final Iterator<String> indexIt = indices.iterator(); String compositeIndex = indexIt.next(); while (indexIt.hasNext()){ @@ -196,7 +197,7 @@ public class CountPlan implements IndexWorkPlan { } @Override - public List<IndexEntry> query(Connector connector, String tableName, List<Long> prospectTimes, String type, String compositeIndex, String dataType, String[] auths) throws TableNotFoundException { + public List<IndexEntry> query(final Connector connector, final String tableName, final List<Long> prospectTimes, final String type, final String compositeIndex, final String dataType, final String[] auths) throws TableNotFoundException { assert connector != null && tableName != null && type != null && compositeIndex != null; final BatchScanner bs = connector.createBatchScanner(tableName, new Authorizations(auths), 4); @@ -242,7 +243,7 @@ public class CountPlan implements IndexWorkPlan { // Create an entry using the values that were found. final String entryDataType = k.getColumnQualifier().toString(); final String entryVisibility = k.getColumnVisibility().toString(); - final Long entryCount = Long.parseLong(new String(v.get())); + final Long entryCount = Long.parseLong(new String(v.get(), StandardCharsets.UTF_8)); indexEntries.add( IndexEntry.builder() http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java index 4dc9253..d75730b 100644 --- a/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java +++ b/extras/rya.prospector/src/main/java/org/apache/rya/prospector/utils/ProspectorUtils.java @@ -24,6 +24,7 @@ import static org.apache.rya.prospector.utils.ProspectorConstants.PASSWORD; import static org.apache.rya.prospector.utils.ProspectorConstants.USERNAME; import static org.apache.rya.prospector.utils.ProspectorConstants.ZOOKEEPERS; +import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; @@ -56,7 +57,7 @@ public class ProspectorUtils { public static final long INDEXED_DATE_SORT_VAL = 999999999999999999L; // 18 char long, same length as date format pattern below public static final String INDEXED_DATE_FORMAT = "yyyyMMddHHmmsssSSS"; - public static String getReverseIndexDateTime(Date date) { + public static String getReverseIndexDateTime(final Date date) { Validate.notNull(date); final String formattedDateString = new SimpleDateFormat(INDEXED_DATE_FORMAT).format(date); final long diff = INDEXED_DATE_SORT_VAL - Long.valueOf(formattedDateString); @@ -64,7 +65,7 @@ public class ProspectorUtils { return Long.toString(diff); } - public static Map<String, IndexWorkPlan> planMap(Collection<IndexWorkPlan> plans) { + public static Map<String, IndexWorkPlan> planMap(final Collection<IndexWorkPlan> plans) { final Map<String, IndexWorkPlan> planMap = new HashMap<>(); for(final IndexWorkPlan plan : plans) { planMap.put(plan.getIndexType(), plan); @@ -72,7 +73,7 @@ public class ProspectorUtils { return planMap; } - public static void initMRJob(Job job, String table, String outtable, String[] auths) throws AccumuloSecurityException { + public static void initMRJob(final Job job, final String table, final String outtable, final String[] auths) throws AccumuloSecurityException { final Configuration conf = job.getConfiguration(); final String username = conf.get(USERNAME); final String password = conf.get(PASSWORD); @@ -91,7 +92,7 @@ public class ProspectorUtils { throw new IllegalArgumentException("Must specify either mock or zookeepers"); } - AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes())); + AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes(StandardCharsets.UTF_8))); AccumuloInputFormat.setInputTableName(job, table); job.setInputFormatClass(AccumuloInputFormat.class); AccumuloInputFormat.setScanAuthorizations(job, new Authorizations(auths)); @@ -100,11 +101,11 @@ public class ProspectorUtils { job.setOutputFormatClass(AccumuloOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes())); + AccumuloOutputFormat.setConnectorInfo(job, username, new PasswordToken(password.getBytes(StandardCharsets.UTF_8))); AccumuloOutputFormat.setDefaultTableName(job, outtable); } - public static void addMRPerformance(Configuration conf) { + public static void addMRPerformance(final Configuration conf) { conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("io.sort.mb", "256"); @@ -112,7 +113,7 @@ public class ProspectorUtils { conf.set("mapred.map.output.compression.codec", GzipCodec.class.getName()); } - public static Instance instance(Configuration conf) { + public static Instance instance(final Configuration conf) { assert conf != null; final String instance_str = conf.get(INSTANCE); @@ -127,7 +128,7 @@ public class ProspectorUtils { } } - public static Connector connector(Instance instance, Configuration conf) throws AccumuloException, AccumuloSecurityException { + public static Connector connector(Instance instance, final Configuration conf) throws AccumuloException, AccumuloSecurityException { final String username = conf.get(USERNAME); final String password = conf.get(PASSWORD); if (instance == null) { @@ -136,7 +137,7 @@ public class ProspectorUtils { return instance.getConnector(username, new PasswordToken(password)); } - public static void writeMutations(Connector connector, String tableName, Collection<Mutation> mutations) throws TableNotFoundException, MutationsRejectedException { + public static void writeMutations(final Connector connector, final String tableName, final Collection<Mutation> mutations) throws TableNotFoundException, MutationsRejectedException { final BatchWriter bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4); for(final Mutation mutation : mutations) { bw.addMutation(mutation); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java b/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java index bb78373..5515a5b 100644 --- a/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java +++ b/extras/rya.reasoning/src/main/java/org/apache/rya/reasoning/mr/ConformanceTest.java @@ -25,6 +25,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -33,10 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.rya.accumulo.mr.MRUtils; -import org.apache.rya.reasoning.Fact; -import org.apache.rya.reasoning.Schema; - import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -45,13 +42,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.rya.accumulo.mr.MRUtils; +import org.apache.rya.reasoning.Fact; +import org.apache.rya.reasoning.Schema; import org.openrdf.OpenRDFException; import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; -import org.openrdf.model.vocabulary.RDF; import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; import org.openrdf.query.BindingSet; import org.openrdf.query.QueryLanguage; import org.openrdf.query.TupleQuery; @@ -98,7 +98,7 @@ public class ConformanceTest extends Configured implements Tool { Set<Statement> inferred = new HashSet<>(); Set<Statement> error = new HashSet<>(); @Override - public void handleStatement(Statement st) { + public void handleStatement(final Statement st) { if (types.contains(TEST_ENTAILMENT)) { expected.add(st); } @@ -107,7 +107,7 @@ public class ConformanceTest extends Configured implements Tool { } } String type() { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); if (types.contains(TEST_CONSISTENCY)) { sb.append("{Consistency}"); } @@ -127,17 +127,17 @@ public class ConformanceTest extends Configured implements Tool { private static class OutputCollector extends RDFHandlerBase { Set<Statement> triples = new HashSet<>(); @Override - public void handleStatement(Statement st) { + public void handleStatement(final Statement st) { triples.add(st); } } - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { ToolRunner.run(new ConformanceTest(), args); } @Override - public int run(String[] args) throws Exception { + public int run(final String[] args) throws Exception { // Validate command if (args.length < 1 || args.length > 2) { System.out.println("Usage:\n"); @@ -155,11 +155,11 @@ public class ConformanceTest extends Configured implements Tool { System.exit(1); } - Set<Value> conformanceTestURIs = new HashSet<>(); + final Set<Value> conformanceTestURIs = new HashSet<>(); Collection<OwlTest> conformanceTests = new LinkedList<>(); - List<OwlTest> successes = new LinkedList<>(); - List<OwlTest> failures = new LinkedList<>(); - Configuration conf = getConf(); + final List<OwlTest> successes = new LinkedList<>(); + final List<OwlTest> failures = new LinkedList<>(); + final Configuration conf = getConf(); Repository repo; File workingDir; @@ -167,13 +167,13 @@ public class ConformanceTest extends Configured implements Tool { if (args.length == 2) { workingDir = new File(args[1]); RDFFormat inputFormat= RDFFormat.RDFXML; - String formatString = conf.get(MRUtils.FORMAT_PROP); + final String formatString = conf.get(MRUtils.FORMAT_PROP); if (formatString != null) { inputFormat = RDFFormat.valueOf(formatString); } repo = new SailRepository(new MemoryStore()); repo.initialize(); - RepositoryConnection conn = repo.getConnection(); + final RepositoryConnection conn = repo.getConnection(); conn.add(new FileInputStream(args[0]), "", inputFormat); conn.close(); } @@ -185,7 +185,7 @@ public class ConformanceTest extends Configured implements Tool { } // Query for the tests we're interested in - RepositoryConnection conn = repo.getConnection(); + final RepositoryConnection conn = repo.getConnection(); conformanceTestURIs.addAll(getTestURIs(conn, TEST_INCONSISTENCY)); conformanceTestURIs.addAll(getTestURIs(conn, TEST_CONSISTENCY)); conformanceTestURIs.addAll(getTestURIs(conn, TEST_ENTAILMENT)); @@ -195,9 +195,9 @@ public class ConformanceTest extends Configured implements Tool { repo.shutDown(); // Set up a MiniAccumulo cluster and set up conf to connect to it - String username = "root"; - String password = "root"; - MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password); + final String username = "root"; + final String password = "root"; + final MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password); mini.start(); conf.set(MRUtils.AC_INSTANCE_PROP, mini.getInstanceName()); conf.set(MRUtils.AC_ZK_PROP, mini.getZooKeepers()); @@ -207,7 +207,7 @@ public class ConformanceTest extends Configured implements Tool { conf.set(MRUtils.TABLE_PREFIX_PROPERTY, "temp_"); // Run the conformance tests int result; - for (OwlTest test : conformanceTests) { + for (final OwlTest test : conformanceTests) { System.out.println(test.uri); result = runTest(conf, args, test); if (result != 0) { @@ -225,14 +225,14 @@ public class ConformanceTest extends Configured implements Tool { mini.stop(); System.out.println("\n" + successes.size() + " successful tests:"); - for (OwlTest test : successes) { + for (final OwlTest test : successes) { System.out.println("\t[SUCCESS] " + test.type() + " " + test.name); } System.out.println("\n" + failures.size() + " failed tests:"); - for (OwlTest test : failures) { + for (final OwlTest test : failures) { System.out.println("\t[FAIL] " + test.type() + " " + test.name); System.out.println("\t\t(" + test.description + ")"); - for (Statement triple : test.error) { + for (final Statement triple : test.error) { if (test.types.contains(TEST_ENTAILMENT)) { System.out.println("\t\tExpected: " + triple); } @@ -250,23 +250,23 @@ public class ConformanceTest extends Configured implements Tool { * @param OwlTest Contains premise/conclusion graphs, will store result * @return Return value of the MapReduce job */ - int runTest(Configuration conf, String[] args, OwlTest test) + int runTest(final Configuration conf, final String[] args, final OwlTest test) throws Exception { conf.setInt(MRReasoningUtils.STEP_PROP, 0); conf.setInt(MRReasoningUtils.SCHEMA_UPDATE_PROP, 0); conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, true); conf.setBoolean(MRReasoningUtils.OUTPUT_FLAG, true); // Connect to MiniAccumulo and load the test - Repository repo = MRReasoningUtils.getRepository(conf); + final Repository repo = MRReasoningUtils.getRepository(conf); repo.initialize(); - RepositoryConnection conn = repo.getConnection(); + final RepositoryConnection conn = repo.getConnection(); conn.clear(); conn.add(new StringReader(test.premise), "", RDFFormat.RDFXML); conn.close(); repo.shutDown(); // Run the reasoner - ReasoningDriver reasoner = new ReasoningDriver(); - int result = ToolRunner.run(conf, reasoner, args); + final ReasoningDriver reasoner = new ReasoningDriver(); + final int result = ToolRunner.run(conf, reasoner, args); test.success = (result == 0); // Inconsistency test: successful if determined inconsistent if (test.types.contains(TEST_INCONSISTENCY)) { @@ -281,21 +281,21 @@ public class ConformanceTest extends Configured implements Tool { || test.types.contains(TEST_ENTAILMENT)) { System.out.println("Reading inferred triples..."); // Read in the inferred triples from HDFS: - Schema schema = MRReasoningUtils.loadSchema(conf); - FileSystem fs = FileSystem.get(conf); - Path path = MRReasoningUtils.getOutputPath(conf, "final"); - OutputCollector inferred = new OutputCollector(); - NTriplesParser parser = new NTriplesParser(); + final Schema schema = MRReasoningUtils.loadSchema(conf); + final FileSystem fs = FileSystem.get(conf); + final Path path = MRReasoningUtils.getOutputPath(conf, "final"); + final OutputCollector inferred = new OutputCollector(); + final NTriplesParser parser = new NTriplesParser(); parser.setRDFHandler(inferred); if (fs.isDirectory(path)) { - for (FileStatus status : fs.listStatus(path)) { - String s = status.getPath().getName(); + for (final FileStatus status : fs.listStatus(path)) { + final String s = status.getPath().getName(); if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT) || s.startsWith(MRReasoningUtils.DEBUG_OUT)) { continue; } - BufferedReader br = new BufferedReader( - new InputStreamReader(fs.open(status.getPath()))); + final BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(status.getPath()), StandardCharsets.UTF_8)); parser.parse(br, ""); br.close(); } @@ -306,8 +306,8 @@ public class ConformanceTest extends Configured implements Tool { if (test.types.contains(TEST_ENTAILMENT)) { // Check expected inferences against the inferred triples and // the schema reasoner - for (Statement st : test.expected) { - Fact fact = new Fact(st); + for (final Statement st : test.expected) { + final Fact fact = new Fact(st); if (!test.inferred.contains(st) && !triviallyTrue(fact.getTriple(), schema) && !schema.containsTriple(fact.getTriple())) { @@ -317,8 +317,8 @@ public class ConformanceTest extends Configured implements Tool { } // Non-entailment test: failure if non-expected triples inferred if (test.types.contains(TEST_NONENTAILMENT)) { - for (Statement st : test.unexpected) { - Fact fact = new Fact(st); + for (final Statement st : test.unexpected) { + final Fact fact = new Fact(st); if (test.inferred.contains(st) || schema.containsTriple(fact.getTriple())) { test.error.add(st); @@ -336,18 +336,18 @@ public class ConformanceTest extends Configured implements Tool { * Query a connection for conformance tests matching a particular * test type. */ - Set<Value> getTestURIs(RepositoryConnection conn, String testType) + Set<Value> getTestURIs(final RepositoryConnection conn, final String testType) throws IOException, OpenRDFException { - Set<Value> testURIs = new HashSet<>(); - TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, + final Set<Value> testURIs = new HashSet<>(); + final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, "select ?test where { " + "?test <" + TYPE + "> <" + testType + "> .\n" + "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" + "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" + "}"); - TupleQueryResult queryResult = query.evaluate(); + final TupleQueryResult queryResult = query.evaluate(); while (queryResult.hasNext()) { - BindingSet bindings = queryResult.next(); + final BindingSet bindings = queryResult.next(); testURIs.add(bindings.getValue("test")); } queryResult.close(); @@ -357,10 +357,10 @@ public class ConformanceTest extends Configured implements Tool { /** * Query a connection for conformance test details. */ - Collection<OwlTest> getTests(RepositoryConnection conn, Set<Value> testURIs) + Collection<OwlTest> getTests(final RepositoryConnection conn, final Set<Value> testURIs) throws IOException, OpenRDFException { - Map<Value, OwlTest> tests = new HashMap<>(); - TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, + final Map<Value, OwlTest> tests = new HashMap<>(); + final TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, "select * where { " + "?test <" + TYPE + "> ?testType .\n" + "?test <" + TEST_PREMISE + "> ?graph .\n" + @@ -371,10 +371,10 @@ public class ConformanceTest extends Configured implements Tool { "OPTIONAL {?test <" + TEST_CONCLUSION + "> ?conclusion .}\n" + "OPTIONAL {?test <" + TEST_NONCONCLUSION + "> ?nonentailed .}\n" + "}"); - TupleQueryResult queryResult = query.evaluate(); + final TupleQueryResult queryResult = query.evaluate(); while (queryResult.hasNext()) { - BindingSet bindings = queryResult.next(); - Value uri = bindings.getValue("test"); + final BindingSet bindings = queryResult.next(); + final Value uri = bindings.getValue("test"); if (testURIs.contains(uri)) { OwlTest test; if (tests.containsKey(uri)) { @@ -397,9 +397,9 @@ public class ConformanceTest extends Configured implements Tool { test.types.add(bindings.getValue("testType").stringValue()); } } - for (OwlTest test : tests.values()) { + for (final OwlTest test : tests.values()) { if (test.compareTo != null) { - RDFXMLParser parser = new RDFXMLParser(); + final RDFXMLParser parser = new RDFXMLParser(); parser.setRDFHandler(test); parser.parse(new StringReader(test.compareTo), ""); } @@ -413,10 +413,10 @@ public class ConformanceTest extends Configured implements Tool { * tests, such as an implicit "[bnode] type Ontology" triple or a * "[class] type Class" triple as long as the class exists. */ - boolean triviallyTrue(Statement triple, Schema schema) { - Resource s = triple.getSubject(); - URI p = triple.getPredicate(); - Value o = triple.getObject(); + boolean triviallyTrue(final Statement triple, final Schema schema) { + final Resource s = triple.getSubject(); + final URI p = triple.getPredicate(); + final Value o = triple.getObject(); if (p.equals(RDF.TYPE)) { if (o.equals(OWL.ONTOLOGY)) { return true; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java index 489fd34..eac06c6 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java @@ -20,19 +20,12 @@ package org.apache.rya.accumulo.mr; */ import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Map.Entry; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.resolver.RyaTripleContext; -import org.apache.rya.api.resolver.triple.TripleRow; -import org.apache.rya.api.resolver.triple.TripleRowResolverException; - import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; @@ -43,6 +36,13 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RyaTripleContext; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; import org.apache.spark.graphx.Edge; /** @@ -63,8 +63,8 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { * @return A RecordReader that can be used to fetch RyaStatementWritables. */ @Override - public RecordReader<Object, Edge> createRecordReader(InputSplit split, - TaskAttemptContext context) { + public RecordReader<Object, Edge> createRecordReader(final InputSplit split, + final TaskAttemptContext context) { return new RyaStatementRecordReader(); } @@ -77,7 +77,7 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { * Statements will be read from the Rya table associated with * this layout. */ - public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { + public static void setTableLayout(final Job conf, final TABLE_LAYOUT layout) { conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); } @@ -89,8 +89,8 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { private RyaTripleContext ryaContext; private TABLE_LAYOUT tableLayout; - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, RangeInputSplit split) { + protected void setupIterators(final TaskAttemptContext context, + final Scanner scanner, final String tableName, final RangeInputSplit split) { } /** @@ -104,7 +104,7 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { * if thrown by the superclass's initialize method. */ @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) + public void initialize(final InputSplit inSplit, final TaskAttemptContext attempt) throws IOException { super.initialize(inSplit, attempt); this.tableLayout = MRUtils.getTableLayout( @@ -127,15 +127,16 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { */ @Override public boolean nextKeyValue() throws IOException { - if (!scannerIterator.hasNext()) - return false; - Entry<Key, Value> entry = scannerIterator.next(); + if (!scannerIterator.hasNext()) { + return false; + } + final Entry<Key, Value> entry = scannerIterator.next(); ++numKeysRead; currentKey = entry.getKey(); try { currentK = currentKey.getRow(); - RyaTypeWritable rtw = new RyaTypeWritable(); - RyaStatement stmt = this.ryaContext.deserializeTriple( + final RyaTypeWritable rtw = new RyaTypeWritable(); + final RyaStatement stmt = this.ryaContext.deserializeTriple( this.tableLayout, new TripleRow(entry.getKey().getRow() .getBytes(), entry.getKey().getColumnFamily() .getBytes(), entry.getKey() @@ -144,28 +145,28 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { .getColumnVisibility().getBytes(), entry .getValue().get())); - long subHash = getVertexId(stmt.getSubject()); - long objHash = getVertexId(stmt.getObject()); + final long subHash = getVertexId(stmt.getSubject()); + final long objHash = getVertexId(stmt.getObject()); rtw.setRyaType(stmt.getPredicate()); - Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>( + final Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>( subHash, objHash, rtw); currentV = writable; - } catch (TripleRowResolverException e) { + } catch (final TripleRowResolverException e) { throw new IOException(e); } return true; } protected List<IteratorSetting> contextIterators( - TaskAttemptContext context, String tableName) { + final TaskAttemptContext context, final String tableName) { return getIterators(context); } @Override - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, - org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + protected void setupIterators(final TaskAttemptContext context, + final Scanner scanner, final String tableName, + final org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { List<IteratorSetting> iterators = null; if (null == split) { @@ -177,13 +178,14 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { } } - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); + for (final IteratorSetting iterator : iterators) { + scanner.addScanIterator(iterator); + } } } - public static long getVertexId(RyaType resource) throws IOException { + public static long getVertexId(final RyaType resource) throws IOException { String uri = ""; if (resource != null) { uri = resource.getData().toString(); @@ -193,20 +195,20 @@ public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { // the digested string, the collision ratio is less than 0.0001% // using custom hash function should significantly reduce the // collision ratio - MessageDigest messageDigest = MessageDigest + final MessageDigest messageDigest = MessageDigest .getInstance("SHA-256"); - messageDigest.update(uri.getBytes()); - String encryptedString = new String(messageDigest.digest()); + messageDigest.update(uri.getBytes(StandardCharsets.UTF_8)); + final String encryptedString = new String(messageDigest.digest(), StandardCharsets.UTF_8); return hash(encryptedString); } - catch (NoSuchAlgorithmException e) { + catch (final NoSuchAlgorithmException e) { throw new IOException(e); } } - public static long hash(String string) { + public static long hash(final String string) { long h = 1125899906842597L; // prime - int len = string.length(); + final int len = string.length(); for (int i = 0; i < len; i++) { h = 31 * h + string.charAt(i); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538cfccc/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java index 5332260..0d42df2 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java @@ -22,6 +22,7 @@ package org.apache.rya.accumulo.mr; import java.io.Closeable; import java.io.Flushable; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.concurrent.TimeUnit; @@ -94,7 +95,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param visibility A comma-separated list of authorizations. */ - public static void setDefaultVisibility(Job job, String visibility) { + public static void setDefaultVisibility(final Job job, final String visibility) { if (visibility != null) { job.getConfiguration().set(CV_PROPERTY, visibility); } @@ -107,7 +108,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param context A context string, should be a syntactically valid URI. */ - public static void setDefaultContext(Job job, String context) { + public static void setDefaultContext(final Job job, final String context) { if (context != null) { job.getConfiguration().set(CONTEXT_PROPERTY, context); } @@ -118,7 +119,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param prefix The common prefix to all rya tables that output will be written to. */ - public static void setTablePrefix(Job job, String prefix) { + public static void setTablePrefix(final Job job, final String prefix) { job.getConfiguration().set(OUTPUT_PREFIX_PROPERTY, prefix); } @@ -127,7 +128,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param enable Whether this job should add its output statements to the free text index. */ - public static void setFreeTextEnabled(Job job, boolean enable) { + public static void setFreeTextEnabled(final Job job, final boolean enable) { job.getConfiguration().setBoolean(ENABLE_FREETEXT, enable); } @@ -136,7 +137,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param enable Whether this job should add its output statements to the temporal index. */ - public static void setTemporalEnabled(Job job, boolean enable) { + public static void setTemporalEnabled(final Job job, final boolean enable) { job.getConfiguration().setBoolean(ENABLE_TEMPORAL, enable); } @@ -145,7 +146,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param enable Whether this job should add its output statements to the entity-centric index. */ - public static void setEntityEnabled(Job job, boolean enable) { + public static void setEntityEnabled(final Job job, final boolean enable) { job.getConfiguration().setBoolean(ENABLE_ENTITY, enable); } @@ -154,7 +155,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to apply the setting to. * @param enable Whether this job should output to the core tables. */ - public static void setCoreTablesEnabled(Job job, boolean enable) { + public static void setCoreTablesEnabled(final Job job, final boolean enable) { job.getConfiguration().setBoolean(ENABLE_CORE, enable); } @@ -163,7 +164,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param job Job to configure * @param instance Name of the mock instance */ - public static void setMockInstance(Job job, String instance) { + public static void setMockInstance(final Job job, final String instance) { AccumuloOutputFormat.setMockInstance(job, instance); job.getConfiguration().setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); job.getConfiguration().setBoolean(MRUtils.AC_MOCK_PROP, true); @@ -175,8 +176,8 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @throws IOException if initializing the core Rya indexer fails. */ @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException { - Configuration conf = jobContext.getConfiguration(); + public void checkOutputSpecs(final JobContext jobContext) throws IOException { + final Configuration conf = jobContext.getConfiguration(); // make sure that all of the indexers can connect getFreeTextIndexer(conf); getTemporalIndexer(conf); @@ -189,7 +190,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @return A committer whose method implementations are empty. */ @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + public OutputCommitter getOutputCommitter(final TaskAttemptContext context) throws IOException, InterruptedException { // copied from AccumuloOutputFormat return new NullOutputFormat<Text, Mutation>().getOutputCommitter(context); } @@ -201,16 +202,16 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @throws IOException if any enabled indexers can't be initialized */ @Override - public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(TaskAttemptContext context) throws IOException { + public RecordWriter<Writable, RyaStatementWritable> getRecordWriter(final TaskAttemptContext context) throws IOException { return new RyaRecordWriter(context); } - private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException { + private static FreeTextIndexer getFreeTextIndexer(final Configuration conf) throws IOException { if (!conf.getBoolean(ENABLE_FREETEXT, true)) { return null; } - AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); + final AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); freeText.setConf(conf); Connector connector; try { @@ -218,7 +219,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable } catch (AccumuloException | AccumuloSecurityException e) { throw new IOException("Error when attempting to create a connection for writing the freeText index.", e); } - MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); freeText.setConnector(connector); freeText.setMultiTableBatchWriter(mtbw); freeText.init(); @@ -226,11 +227,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable return freeText; } - private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException { + private static TemporalIndexer getTemporalIndexer(final Configuration conf) throws IOException { if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { return null; } - AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); + final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); temporal.setConf(conf); Connector connector; try { @@ -238,34 +239,34 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable } catch (AccumuloException | AccumuloSecurityException e) { throw new IOException("Error when attempting to create a connection for writing the temporal index.", e); } - MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + final MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); temporal.setConnector(connector); temporal.setMultiTableBatchWriter(mtbw); temporal.init(); return temporal; } - private static EntityCentricIndex getEntityIndexer(Configuration conf) { + private static EntityCentricIndex getEntityIndexer(final Configuration conf) { if (!conf.getBoolean(ENABLE_ENTITY, true)) { return null; } - EntityCentricIndex entity = new EntityCentricIndex(); + final EntityCentricIndex entity = new EntityCentricIndex(); entity.setConf(conf); return entity; } - private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws IOException { + private static AccumuloRyaDAO getRyaIndexer(final Configuration conf) throws IOException { try { if (!conf.getBoolean(ENABLE_CORE, true)) { return null; } - AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO(); - Connector conn = ConfigUtils.getConnector(conf); + final AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO(); + final Connector conn = ConfigUtils.getConnector(conf); ryaIndexer.setConnector(conn); - AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); - String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null); + final String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null); if (tablePrefix != null) { ryaConf.setTablePrefix(tablePrefix); } @@ -273,13 +274,13 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable ryaIndexer.setConf(ryaConf); ryaIndexer.init(); return ryaIndexer; - } catch (AccumuloException e) { + } catch (final AccumuloException e) { logger.error("Cannot create RyaIndexer", e); throw new IOException(e); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { logger.error("Cannot create RyaIndexer", e); throw new IOException(e); - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { logger.error("Cannot create RyaIndexer", e); throw new IOException(e); } @@ -293,11 +294,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable implements Closeable, Flushable { private static final Logger logger = Logger.getLogger(RyaRecordWriter.class); - private FreeTextIndexer freeTextIndexer; - private TemporalIndexer temporalIndexer; - private EntityCentricIndex entityIndexer; - private AccumuloRyaDAO ryaIndexer; - private RyaTripleContext tripleContext; + private final FreeTextIndexer freeTextIndexer; + private final TemporalIndexer temporalIndexer; + private final EntityCentricIndex entityIndexer; + private final AccumuloRyaDAO ryaIndexer; + private final RyaTripleContext tripleContext; private MultiTableBatchWriter writer; private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression(); private RyaURI defaultContext = null; @@ -305,10 +306,10 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable private static final long ONE_MEGABYTE = 1024L * 1024L; private static final long AVE_STATEMENT_SIZE = 100L; - private long bufferSizeLimit; + private final long bufferSizeLimit; private long bufferCurrentSize = 0; - private ArrayList<RyaStatement> buffer; + private final ArrayList<RyaStatement> buffer; /** * Constructor. @@ -316,7 +317,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @throws IOException if the core Rya indexer or entity indexer can't * be initialized */ - public RyaRecordWriter(TaskAttemptContext context) throws IOException { + public RyaRecordWriter(final TaskAttemptContext context) throws IOException { this(context.getConfiguration()); } @@ -326,21 +327,21 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @throws IOException if the core Rya indexer or entity indexer can't * be initialized */ - public RyaRecordWriter(Configuration conf) throws IOException { + public RyaRecordWriter(final Configuration conf) throws IOException { // set the visibility - String visibility = conf.get(CV_PROPERTY); + final String visibility = conf.get(CV_PROPERTY); if (visibility != null) { - cv = visibility.getBytes(); + cv = visibility.getBytes(StandardCharsets.UTF_8); } // set the default context - String context = conf.get(CONTEXT_PROPERTY, ""); + final String context = conf.get(CONTEXT_PROPERTY, ""); if (context != null && !context.isEmpty()) { defaultContext = new RyaURI(context); } // set up the buffer bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE); - int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE); + final int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE); buffer = new ArrayList<RyaStatement>(bufferCapacity); // set up the indexers @@ -358,7 +359,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable } catch (AccumuloException | AccumuloSecurityException e) { throw new IOException("Error connecting to Accumulo for entity index output", e); } - BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + final BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); batchWriterConfig.setMaxMemory(RdfCloudTripleStoreConstants.MAX_MEMORY); batchWriterConfig.setTimeout(RdfCloudTripleStoreConstants.MAX_TIME, TimeUnit.MILLISECONDS); batchWriterConfig.setMaxWriteThreads(RdfCloudTripleStoreConstants.NUM_THREADS); @@ -396,41 +397,45 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param paramTaskAttemptContext Unused. */ @Override - public void close(TaskAttemptContext paramTaskAttemptContext) { + public void close(final TaskAttemptContext paramTaskAttemptContext) { // close everything. log errors try { flush(); - } catch (IOException e) { + } catch (final IOException e) { logger.error("Error flushing the buffer on RyaOutputFormat Close", e); } try { - if (freeTextIndexer != null) + if (freeTextIndexer != null) { freeTextIndexer.close(); - } catch (IOException e) { + } + } catch (final IOException e) { logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e); } try { - if (temporalIndexer != null) + if (temporalIndexer != null) { temporalIndexer.close(); - } catch (IOException e) { + } + } catch (final IOException e) { logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e); } try { - if (entityIndexer != null) + if (entityIndexer != null) { entityIndexer.close(); - } catch (IOException e) { + } + } catch (final IOException e) { logger.error("Error closing the entityIndexer on RyaOutputFormat Close", e); } try { - if (ryaIndexer != null) + if (ryaIndexer != null) { ryaIndexer.destroy(); - } catch (RyaDAOException e) { + } + } catch (final RyaDAOException e) { logger.error("Error closing RyaDAO on RyaOutputFormat Close", e); } if (writer != null) { try { writer.close(); - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { logger.error("Error closing MultiTableBatchWriter on RyaOutputFormat Close", e); } } @@ -442,7 +447,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param statement Statement to insert to Rya. * @throws IOException if writing to Accumulo fails. */ - public void write(Statement statement) throws IOException { + public void write(final Statement statement) throws IOException { write(RdfToRyaConversions.convertStatement(statement)); } @@ -452,7 +457,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @param ryaStatement Statement to insert to Rya. * @throws IOException if writing to Accumulo fails. */ - public void write(RyaStatement ryaStatement) throws IOException { + public void write(final RyaStatement ryaStatement) throws IOException { write(NullWritable.get(), new RyaStatementWritable(ryaStatement, tripleContext)); } @@ -464,8 +469,8 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable * @throws IOException if writing to Accumulo fails. */ @Override - public void write(Writable key, RyaStatementWritable value) throws IOException { - RyaStatement ryaStatement = value.getRyaStatement(); + public void write(final Writable key, final RyaStatementWritable value) throws IOException { + final RyaStatement ryaStatement = value.getRyaStatement(); if (ryaStatement.getColumnVisibility() == null) { ryaStatement.setColumnVisibility(cv); } @@ -479,11 +484,11 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable } } - private int statementSize(RyaStatement ryaStatement) { - RyaURI subject = ryaStatement.getSubject(); - RyaURI predicate = ryaStatement.getPredicate(); - RyaType object = ryaStatement.getObject(); - RyaURI context = ryaStatement.getContext(); + private int statementSize(final RyaStatement ryaStatement) { + final RyaURI subject = ryaStatement.getSubject(); + final RyaURI predicate = ryaStatement.getPredicate(); + final RyaType object = ryaStatement.getObject(); + final RyaURI context = ryaStatement.getContext(); int size = 3 + subject.getData().length() + predicate.getData().length() + object.getData().length(); if (!XMLSchema.ANYURI.equals(object.getDataType())) { size += 2 + object.getDataType().toString().length(); @@ -508,15 +513,15 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable totalCommitRecords += buffer.size(); commitCount++; - long startCommitTime = System.currentTimeMillis(); + final long startCommitTime = System.currentTimeMillis(); logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", commitCount, buffer.size(), bufferCurrentSize)); - double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.; + final double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.; totalReadDuration += readingDuration; - double currentReadRate = buffer.size() / readingDuration; - double totalReadRate = totalCommitRecords / totalReadDuration; + final double currentReadRate = buffer.size() / readingDuration; + final double totalReadRate = totalCommitRecords / totalReadDuration; // Print "reading" metrics logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration, @@ -539,7 +544,7 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable entityIndexer.storeStatements(buffer); try { writer.flush(); - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new IOException("Error flushing data to Accumulo for entity indexing", e); } } @@ -549,26 +554,26 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable if (ryaIndexer != null) { ryaIndexer.add(buffer.iterator()); } - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { logger.error("Cannot write statement to Rya", e); throw new IOException(e); } lastCommitFinishTime = System.currentTimeMillis(); - double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.; + final double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.; totalWriteDuration += writingDuration; - double currentWriteRate = buffer.size() / writingDuration; - double totalWriteRate = totalCommitRecords / totalWriteDuration; + final double currentWriteRate = buffer.size() / writingDuration; + final double totalWriteRate = totalCommitRecords / totalWriteDuration; // Print "writing" stats logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration, currentWriteRate, totalWriteRate)); - double processDuration = writingDuration + readingDuration; - double totalProcessDuration = totalWriteDuration + totalReadDuration; - double currentProcessRate = buffer.size() / processDuration; - double totalProcessRate = totalCommitRecords / (totalProcessDuration); + final double processDuration = writingDuration + readingDuration; + final double totalProcessDuration = totalWriteDuration + totalReadDuration; + final double currentProcessRate = buffer.size() / processDuration; + final double totalProcessRate = totalCommitRecords / (totalProcessDuration); // Print "total" stats logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, processDuration,
