http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloQueryRuleset.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloQueryRuleset.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloQueryRuleset.java new file mode 100644 index 0000000..1b95ea6 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloQueryRuleset.java @@ -0,0 +1,203 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.mapreduce.InputTableConfig; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.io.Text; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; + +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.query.strategy.ByteRange; +import mvm.rya.api.query.strategy.TriplePatternStrategy; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.utils.NullableStatementImpl; + +/** + * A {@link QueryRuleset} that additionally maps rules to ranges in Accumulo tables. Also enables + * copying one or more entire tables, independent of the query-derived rules. + */ +public class AccumuloQueryRuleset extends QueryRuleset { + private Map<TABLE_LAYOUT, List<Range>> tableRanges = new HashMap<>(); + private List<String> entireTables = new LinkedList<String>(); + private RyaTripleContext ryaContext; + + /** + * Constructs the ruleset and the associated Ranges, given a Configuration that contains a SPARQL query. + * @param conf Configuration including the query and connection information. + * @throws IOException if the range can't be resolved + * @throws QueryRulesetException if the query can't be translated to valid rules + */ + public AccumuloQueryRuleset(RdfCloudTripleStoreConfiguration conf) throws IOException, QueryRulesetException { + // Extract StatementPatterns and conditions from the query + super(conf); + // Turn StatementPatterns into Ranges + ryaContext = RyaTripleContext.getInstance(conf); + for (CopyRule rule : rules) { + StatementPattern sp = rule.getStatement(); + Map.Entry<TABLE_LAYOUT, ByteRange> entry = getRange(sp); + TABLE_LAYOUT layout = entry.getKey(); + ByteRange byteRange = entry.getValue(); + Range range = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + if (!tableRanges.containsKey(layout)) { + tableRanges.put(layout, new LinkedList<Range>()); + } + tableRanges.get(layout).add(range); + } + } + + /** + * Turn a single StatementPattern into a Range. + * @param conf + * @throws IOException if the range can't be resolved + */ + private Map.Entry<TABLE_LAYOUT, ByteRange> getRange(StatementPattern sp) throws IOException { + Var context = sp.getContextVar(); + Statement stmt = new NullableStatementImpl((Resource) sp.getSubjectVar().getValue(), + (URI) sp.getPredicateVar().getValue(), sp.getObjectVar().getValue(), + context == null ? null : (Resource) context.getValue()); + RyaStatement rs = RdfToRyaConversions.convertStatement(stmt); + TriplePatternStrategy strategy = ryaContext.retrieveStrategy(rs); + Map.Entry<TABLE_LAYOUT, ByteRange> entry = + strategy.defineRange(rs.getSubject(), rs.getPredicate(), rs.getObject(), rs.getContext(), conf); + return entry; + } + + /** + * Add an instruction to select an entire table, with no restricting rule. + */ + public void addTable(String tableName) { + entireTables.add(tableName); + } + + /** + * Get table names and input configurations for each range + * @return A Map representing each table and {@link InputTableConfig} needed to get all the rows that match the rules. + */ + public Map<String, InputTableConfig> getInputConfigs() { + Map<String, InputTableConfig> configs = new HashMap<>(); + for (TABLE_LAYOUT layout : tableRanges.keySet()) { + String parentTable = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, conf.getTablePrefix()); + InputTableConfig config = new InputTableConfig(); + config.setRanges(tableRanges.get(layout)); + configs.put(parentTable, config); + } + for (String tableName : entireTables) { + InputTableConfig config = new InputTableConfig(); + List<Range> ranges = new LinkedList<>(); + ranges.add(new Range()); + config.setRanges(ranges); + configs.put(tableName, config); + } + return configs; + } + + /** + * Get the rules that apply to all statements within a Range. The range may not + * contain every row relevant to the associated rule(s), but every row within the + * range is relevant to the rule(s). + * @param layout Defines which table the range is meant to scan + * @param range The Range of rows in that table + * @return Any rules in this ruleset that match the given table and contain the given range + * @throws IOException if the Range can't be resolved + */ + public List<CopyRule> getRules(TABLE_LAYOUT layout, Range range) throws IOException { + List<CopyRule> matchingRules = new LinkedList<>(); + for (CopyRule rule : rules) { + // Compare the rule to the given range + Map.Entry<TABLE_LAYOUT, ByteRange> entry = getRange(rule.getStatement()); + TABLE_LAYOUT ruleLayout = entry.getKey(); + // If they apply to different tables, they are unrelated. + if (!ruleLayout.equals(layout)) { + continue; + } + // If the given range is contained in (or equal to) the rule's range, then the + // rule matches and should be included. + ByteRange byteRange = entry.getValue(); + Range ruleRange = new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())); + if (rangeContainsRange(ruleRange, range)) { + matchingRules.add(rule); + } + } + return matchingRules; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(super.toString()); + for (String fullTableName : entireTables) { + sb.append("\n\tCopy entire table ").append(fullTableName).append("\n"); + } + return sb.toString(); + } + + private static boolean rangeContainsRange(Range r1, Range r2) { + // 1. If r1.start is infinite, r1 contains r2.start + if (!r1.isInfiniteStartKey()) { + // 2. Otherwise, if r2.start is infinite, r1 can't contain r2 + if (r2.isInfiniteStartKey()) { + return false; + } + Key start2 = r2.getStartKey(); + // 3. If r2 is inclusive, r1 needs to contain r2's start key. + if (r2.isStartKeyInclusive()) { + if (!r1.contains(start2)) { + return false; + } + } + // 4. Otherwise, the only failure is if r2's start key comes first (they can be equal) + else if (start2.compareTo(r1.getStartKey()) < 0) { + return false; + } + } + // Similar logic for end points + if (!r1.isInfiniteStopKey()) { + if (r2.isInfiniteStopKey()) { + return false; + } + Key end2 = r2.getEndKey(); + if (r2.isEndKeyInclusive()) { + if (!r1.contains(end2)) { + return false; + } + } + else if (end2.compareTo(r1.getEndKey()) > 0) { + return false; + } + } + return true; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java new file mode 100644 index 0000000..daead31 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloRyaUtils.java @@ -0,0 +1,667 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.openrdf.model.Literal; +import org.openrdf.model.ValueFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Utility methods for an Accumulo Rya instance. + */ +public final class AccumuloRyaUtils { + private static final Logger log = Logger.getLogger(AccumuloRyaUtils.class); + + public static final String COPY_TOOL_RUN_TIME_LOCAL_NAME = "copy_tool_run_time"; + public static final String COPY_TOOL_SPLIT_TIME_LOCAL_NAME = "copy_tool_split_time"; + public static final String COPY_TOOL_TIME_OFFSET_LOCAL_NAME = "copy_tool_time_offset"; + + /** + * ISO8601 time format. + */ + private static final SimpleDateFormat TIME_FORMATTER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"); + private static final Date DEFAULT_DATE = new Date(0); + + private static final String NAMESPACE = RdfCloudTripleStoreConstants.NAMESPACE; + private static final ValueFactory VALUE_FACTORY = RdfCloudTripleStoreConstants.VALUE_FACTORY; + public static RyaURI RTS_SUBJECT_RYA = RdfCloudTripleStoreConstants.RTS_SUBJECT_RYA; + public static RyaURI RTS_COPY_TOOL_RUN_TIME_PREDICATE_RYA = createRyaUri(COPY_TOOL_RUN_TIME_LOCAL_NAME); + public static RyaURI RTS_COPY_TOOL_SPLIT_TIME_PREDICATE_RYA = createRyaUri(COPY_TOOL_SPLIT_TIME_LOCAL_NAME); + public static RyaURI RTS_TIME_OFFSET_PREDICATE_RYA = createRyaUri(COPY_TOOL_TIME_OFFSET_LOCAL_NAME); + + /** + * Ignore the meta statements indicating the Rya version and copy time values. + */ + public static final ImmutableSet<IteratorSetting> COMMON_REG_EX_FILTER_SETTINGS = ImmutableSet.of( + getVersionRegExFilterSetting(), + getCopyToolRunTimeRegExFilterSetting(), + getCopyToolSplitTimeRegExFilterSetting(), + getCopyToolTimeOffsetRegExFilterSetting() + ); + + /** + * Private constructor to prevent instantiation. + */ + private AccumuloRyaUtils() { + } + + /** + * Creates a {@link RyaURI} for the specified local name. + * @param localName the URI's local name. + * @return the {@link RyraURI}. + */ + public static RyaURI createRyaUri(String localName) { + return createRyaUri(NAMESPACE, localName); + } + + /** + * Creates a {@link RyaURI} for the specified local name. + * @param namespace the namespace. + * @param localName the URI's local name. + * @return the {@link RyraURI}. + */ + public static RyaURI createRyaUri(String namespace, String localName) { + return RdfToRyaConversions.convertURI(VALUE_FACTORY.createURI(namespace, localName)); + } + + /** + * Creates a copy tool run time {@link RyaStatement} from the specified {@link Date}. + * @param date the copy tool run time {@link Date}. + * @return the {@link RyaStatement} for the copy tool run time. + */ + public static RyaStatement createCopyToolRunTimeRyaStatement(Date date) { + Literal literal = VALUE_FACTORY.createLiteral(date != null ? date : DEFAULT_DATE); + RyaType timeObject = new RyaType(literal.getDatatype(), literal.stringValue()); + return new RyaStatement(RTS_SUBJECT_RYA, RTS_COPY_TOOL_RUN_TIME_PREDICATE_RYA, timeObject); + } + + /** + * Creates a copy tool split time {@link RyaStatement} from the specified {@link Date}. + * @param date the copy tool split time {@link Date}. + * @return the {@link RyaStatement} for the copy tool split time. + */ + public static RyaStatement createCopyToolSplitTimeRyaStatement(Date date) { + Literal literal = VALUE_FACTORY.createLiteral(date != null ? date : DEFAULT_DATE); + RyaType timeObject = new RyaType(literal.getDatatype(), literal.stringValue()); + return new RyaStatement(RTS_SUBJECT_RYA, RTS_COPY_TOOL_SPLIT_TIME_PREDICATE_RYA, timeObject); + } + + /** + * Gets the copy tool run {@link Date} metadata for the table. + * @param dao the {@link AccumuloRyaDAO}. + * @return the copy tool run {@link Date}. + * @throws RyaDAOException + */ + public static Date getCopyToolRunDate(AccumuloRyaDAO dao) throws RyaDAOException { + String time = getCopyToolRunTime(dao); + Date date = null; + if (time != null) { + try { + date = TIME_FORMATTER.parse(time); + } catch (ParseException e) { + log.error("Unable to parse the copy tool run time found in table: " + time, e); + } + } + return date; + } + + /** + * Gets the copy tool split {@link Date} metadata for the table. + * @param dao the {@link AccumuloRyaDAO}. + * @return the copy tool split {@link Date}. + * @throws RyaDAOException + */ + public static Date getCopyToolSplitDate(AccumuloRyaDAO dao) throws RyaDAOException { + String time = getCopyToolSplitTime(dao); + Date date = null; + if (time != null) { + try { + date = TIME_FORMATTER.parse(time); + } catch (ParseException e) { + log.error("Unable to parse the copy tool split time found in table: " + time, e); + } + } + return date; + } + + /** + * Gets the copy tool run time metadata for the table. + * @param dao the {@link AccumuloRyaDAO}. + * @return the copy tool run time value. + * @throws RyaDAOException + */ + public static String getCopyToolRunTime(AccumuloRyaDAO dao) throws RyaDAOException { + return getMetadata(RTS_COPY_TOOL_RUN_TIME_PREDICATE_RYA, dao); + } + + /** + * Gets the copy tool split time metadata for the table. + * @param dao the {@link AccumuloRyaDAO}. + * @return the copy tool split time value. + * @throws RyaDAOException + */ + public static String getCopyToolSplitTime(AccumuloRyaDAO dao) throws RyaDAOException { + return getMetadata(RTS_COPY_TOOL_SPLIT_TIME_PREDICATE_RYA, dao); + } + + /** + * Gets the metadata key from the table. + * @param ryaStatement the {@link RyaStatement} for the metadata key to query. + * @param dao the {@link AccumuloRyaDAO}. + * @return the string value of the object from the metadata key. + * @throws RyaDAOException + */ + private static String getMetadata(RyaURI predicateRyaUri, AccumuloRyaDAO dao) throws RyaDAOException { + RyaStatement ryaStatement = new RyaStatement(RTS_SUBJECT_RYA, predicateRyaUri, null); + return getMetadata(ryaStatement, dao); + } + + /** + * Gets the metadata key from the table. + * @param ryaStatement the {@link RyaStatement} for the metadata key to query. + * @param dao the {@link AccumuloRyaDAO}. + * @return the string value of the object from the metadata key. + * @throws RyaDAOException + */ + private static String getMetadata(RyaStatement ryaStatement, AccumuloRyaDAO dao) throws RyaDAOException { + String metadata = null; + AccumuloRdfConfiguration config = dao.getConf(); + CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(ryaStatement, config); + if (iter.hasNext()) { + metadata = iter.next().getObject().getData(); + } + iter.close(); + + return metadata; + } + + /** + * Sets the copy tool run {@link Date} metadata key for the table. + * @param date the copy tool run time {@link Date}. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + public static RyaStatement setCopyToolRunDate(Date date, AccumuloRyaDAO dao) throws RyaDAOException { + RyaStatement ryaStatement = createCopyToolRunTimeRyaStatement(date); + dao.add(ryaStatement); + return ryaStatement; + } + + /** + * Sets the copy tool split {@link Date} metadata key for the table. + * @param date the tool copy split time {@link Date}. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + public static RyaStatement setCopyToolSplitDate(Date date, AccumuloRyaDAO dao) throws RyaDAOException { + RyaStatement ryaStatement = createCopyToolSplitTimeRyaStatement(date); + dao.add(ryaStatement); + return ryaStatement; + } + + /** + * Sets the copy tool run time metadata key for the table. + * @param time the copy tool run time. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + public static RyaStatement setCopyToolRunTime(String time, AccumuloRyaDAO dao) throws RyaDAOException { + Date date = null; + try { + date = TIME_FORMATTER.parse(time); + } catch (ParseException e) { + log.error("Unable to parse the copy tool run time: ", e); + } + return setCopyToolRunDate(date, dao); + } + + /** + * Sets the copy tool split time metadata key for the table. + * @param time the tool copy split time. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + public static RyaStatement setCopyToolSplitTime(String time, AccumuloRyaDAO dao) throws RyaDAOException { + Date date = null; + try { + date = TIME_FORMATTER.parse(time); + } catch (ParseException e) { + log.error("Unable to parse the copy tool split time: ", e); + } + return setCopyToolSplitDate(date, dao); + } + + /** + * Creates a {@link RegExFilter} setting to ignore the version row in a table. + * @return the {@link RegExFilter} {@link IteratorSetting}. + */ + public static IteratorSetting getVersionRegExFilterSetting() { + IteratorSetting regex = new IteratorSetting(30, "version_regex", RegExFilter.class); + RegExFilter.setRegexs(regex, "(.*)urn:(.*)#version[\u0000|\u0001](.*)", null, null, null, false); + RegExFilter.setNegate(regex, true); + return regex; + } + + /** + * Creates a {@link RegExFilter} setting to ignore the copy tool run time row in a table. + * @return the {@link RegExFilter} {@link IteratorSetting}. + */ + public static IteratorSetting getCopyToolRunTimeRegExFilterSetting() { + IteratorSetting regex = new IteratorSetting(31, COPY_TOOL_RUN_TIME_LOCAL_NAME + "_regex", RegExFilter.class); + RegExFilter.setRegexs(regex, "(.*)urn:(.*)#" + COPY_TOOL_RUN_TIME_LOCAL_NAME + "[\u0000|\u0001](.*)", null, null, null, false); + RegExFilter.setNegate(regex, true); + return regex; + } + + /** + * Creates a {@link RegExFilter} setting to ignore the copy tool split time row in a table. + * @return the {@link RegExFilter} {@link IteratorSetting}. + */ + public static IteratorSetting getCopyToolSplitTimeRegExFilterSetting() { + IteratorSetting regex = new IteratorSetting(32, COPY_TOOL_SPLIT_TIME_LOCAL_NAME + "_regex", RegExFilter.class); + RegExFilter.setRegexs(regex, "(.*)urn:(.*)#" + COPY_TOOL_SPLIT_TIME_LOCAL_NAME + "[\u0000|\u0001](.*)", null, null, null, false); + RegExFilter.setNegate(regex, true); + return regex; + } + + /** + * Creates a {@link RegExFilter} setting to ignore the copy tool time setting row in a table. + * @return the {@link RegExFilter} {@link IteratorSetting}. + */ + public static IteratorSetting getCopyToolTimeOffsetRegExFilterSetting() { + IteratorSetting regex = new IteratorSetting(33, COPY_TOOL_TIME_OFFSET_LOCAL_NAME + "_regex", RegExFilter.class); + RegExFilter.setRegexs(regex, "(.*)urn:(.*)#" + COPY_TOOL_TIME_OFFSET_LOCAL_NAME + "[\u0000|\u0001](.*)", null, null, null, false); + RegExFilter.setNegate(regex, true); + return regex; + } + + /** + * Adds all the common regex filter {@link IteratorSetting}s to the provided {@link Scanner} so + * certain metadata keys in a table are ignored. + * @param scanner the {@link Scanner} to add the regex filter {@link IteratorSetting}s to. + */ + public static void addCommonScannerIteratorsTo(Scanner scanner) { + for (IteratorSetting iteratorSetting : COMMON_REG_EX_FILTER_SETTINGS) { + scanner.addScanIterator(iteratorSetting); + } + } + + /** + * Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tablename the name of the table to scan. + * @param config the {@link Configuration}. + * @return the {@link Scanner} for the table. + * @throws IOException + */ + public static Scanner getScanner(String tableName, Configuration config) throws IOException { + return getScanner(tableName, config, true); + } + + /** + * Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}. + * @param tablename the name of the table to scan. + * @param config the {@link Configuration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @return the {@link Scanner} for the table. + * @throws IOException + */ + public static Scanner getScanner(String tableName, Configuration config, boolean shouldAddCommonIterators) throws IOException { + try { + String instanceName = config.get(ConfigUtils.CLOUDBASE_INSTANCE); + String zooKeepers = config.get(ConfigUtils.CLOUDBASE_ZOOKEEPERS); + Instance instance; + if (ConfigUtils.useMockInstance(config)) { + instance = new MockInstance(config.get(ConfigUtils.CLOUDBASE_INSTANCE)); + } else { + instance = new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)); + } + String username = ConfigUtils.getUsername(config); + String password = ConfigUtils.getPassword(config); + Connector connector = instance.getConnector(username, new PasswordToken(password)); + Authorizations auths = ConfigUtils.getAuthorizations(config); + + Scanner scanner = connector.createScanner(tableName, auths); + if (shouldAddCommonIterators) { + AccumuloRyaUtils.addCommonScannerIteratorsTo(scanner); + } + return scanner; + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + log.error("Error connecting to " + tableName); + throw new IOException(e); + } + } + + /** + * Prints the table with the specified config and additional settings. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTable(String tableName, AccumuloRdfConfiguration config, IteratorSetting... settings) throws IOException { + printTable(tableName, config, true, settings); + } + + /** + * Prints the table with the specified config and additional settings. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTable(String tableName, AccumuloRdfConfiguration config, boolean shouldAddCommonIterators, IteratorSetting... settings) throws IOException { + Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators); + for (IteratorSetting setting : settings) { + scanner.addScanIterator(setting); + } + + Iterator<Entry<Key, Value>> iterator = scanner.iterator(); + + String instance = config.get(MRUtils.AC_INSTANCE_PROP); + log.info("=================="); + log.info("TABLE: " + tableName + " INSTANCE: " + instance); + log.info("------------------"); + while (iterator.hasNext()) { + Entry<Key, Value> entry = iterator.next(); + Key key = entry.getKey(); + Value value = entry.getValue(); + String keyString = getFormattedKeyString(key); + log.info(keyString + " - " + value); + } + log.info("=================="); + } + + private static String getFormattedKeyString(Key key) { + StringBuilder sb = new StringBuilder(); + byte[] row = key.getRow().getBytes(); + byte[] colFamily = key.getColumnFamily().getBytes(); + byte[] colQualifier = key.getColumnQualifier().getBytes(); + byte[] colVisibility = key.getColumnVisibility().getBytes(); + int maxRowDataToPrint = 256; + Key.appendPrintableString(row, 0, row.length, maxRowDataToPrint, sb); + sb.append(" "); + Key.appendPrintableString(colFamily, 0, colFamily.length, maxRowDataToPrint, sb); + sb.append(":"); + Key.appendPrintableString(colQualifier, 0, colQualifier.length, maxRowDataToPrint, sb); + sb.append(" ["); + Key.appendPrintableString(colVisibility, 0, colVisibility.length, maxRowDataToPrint, sb); + sb.append("]"); + sb.append(" "); + sb.append(new Date(key.getTimestamp())); + //sb.append(Long.toString(key.getTimestamp())); + //sb.append(" "); + //sb.append(key.isDeleted()); + return sb.toString(); + } + + /** + * Prints the table with pretty formatting using the specified config and additional settings. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTablePretty(String tableName, Configuration config, IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException { + printTablePretty(tableName, config, true, settings); + } + + /** + * Prints the table with pretty formatting using the specified config and additional settings. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTablePretty(String tableName, Configuration config, boolean shouldAddCommonIterators, IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException { + Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators); + for (IteratorSetting setting : settings) { + scanner.addScanIterator(setting); + } + + String format = "| %-64s | %-24s | %-28s | %-20s | %-20s | %-10s |"; + int totalFormatLength = String.format(format, 1, 2, 3, 4, 5, 6).length(); + String instance = config.get(MRUtils.AC_INSTANCE_PROP); + log.info(StringUtils.rightPad("==================", totalFormatLength, "=")); + log.info(StringUtils.rightPad("| TABLE: " + tableName + " INSTANCE: " + instance, totalFormatLength - 1) + "|"); + log.info(StringUtils.rightPad("------------------", totalFormatLength, "-")); + log.info(String.format(format, "--Row--", "--ColumnVisibility--", "--Timestamp--", "--ColumnFamily--", "--ColumnQualifier--", "--Value--")); + log.info(StringUtils.rightPad("|-----------------", totalFormatLength - 1, "-") + "|"); + for (Entry<Key, Value> entry : scanner) { + Key k = entry.getKey(); + String rowString = Key.appendPrintableString(k.getRow().getBytes(), 0, k.getRow().getLength(), Constants.MAX_DATA_TO_PRINT, new StringBuilder()).toString(); + log.info(String.format(format, rowString, k.getColumnVisibility(), new Date(k.getTimestamp()), k.getColumnFamily(), k.getColumnQualifier(), entry.getValue())); + } + log.info(StringUtils.rightPad("==================", totalFormatLength, "=")); + } + + /** + * Adds authorizations to a user's authorizations list. + * @param user the name of the user to add authorizations for. + * @param secOps the {@link SecurityOperations}. + * @param auths the {@link Authorizations} to add + * @return the {@link Authorizations}. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public static Authorizations addUserAuths(String user, SecurityOperations secOps, Authorizations auths) throws AccumuloException, AccumuloSecurityException { + List<String> authList = new ArrayList<>(); + for (byte[] authBytes : auths.getAuthorizations()) { + String auth = new String(authBytes); + authList.add(auth); + } + return addUserAuths(user, secOps, authList.toArray(new String[0])); + } + + /** + * Adds authorizations to a user's authorizations list. + * @param user the name of the user to add authorizations for. + * @param secOps the {@link SecurityOperations}. + * @param auths the list of authorizations to add + * @return the {@link Authorizations}. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public static Authorizations addUserAuths(String user, SecurityOperations secOps, String... auths) throws AccumuloException, AccumuloSecurityException { + Authorizations currentUserAuths = secOps.getUserAuthorizations(user); + List<byte[]> authList = new ArrayList<>(); + for (byte[] currentAuth : currentUserAuths.getAuthorizations()) { + authList.add(currentAuth); + } + for (String newAuth : auths) { + authList.add(newAuth.getBytes()); + } + Authorizations result = new Authorizations(authList); + return result; + } + + /** + * Removes the specified authorizations from the user. + * @param userName the name of the user to change authorizations for. + * @param secOps the {@link SecurityOperations} to change. + * @param authsToRemove the comma-separated string of authorizations to remove. + * @throws AccumuloSecurityException + * @throws AccumuloException + */ + public static void removeUserAuths(String userName, SecurityOperations secOps, String authsToRemove) throws AccumuloException, AccumuloSecurityException { + Authorizations currentUserAuths = secOps.getUserAuthorizations(userName); + List<String> authList = convertAuthStringToList(currentUserAuths.toString()); + + List<String> authsToRemoveList = convertAuthStringToList(authsToRemove); + authList.removeAll(authsToRemoveList); + + String authString = Joiner.on(",").join(authList); + Authorizations newAuths = new Authorizations(authString); + + secOps.changeUserAuthorizations(userName, newAuths); + } + + /** + * Convert the comma-separated string of authorizations into a list of authorizations. + * @param authString the comma-separated string of authorizations. + * @return a {@link List} of authorization strings. + */ + public static List<String> convertAuthStringToList(String authString) { + List<String> authList = new ArrayList<>(); + if (authString != null) { + String[] authSplit = authString.split(","); + authList.addAll(Arrays.asList(authSplit)); + } + return authList; + } + + /** + * Sets up a {@link Connector} with the specified config. + * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}. + * @return the {@link Connector}. + */ + public static Connector setupConnector(AccumuloRdfConfiguration accumuloRdfConfiguration) { + Connector connector = null; + try { + connector = ConfigUtils.getConnector(accumuloRdfConfiguration); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Error creating connector", e); + } + + return connector; + } + + /** + * Sets up a {@link AccumuloRyaDAO} with the specified connector. + * @param connector the {@link Connector}. + * @return the {@link AccumuloRyaDAO}. + */ + public static AccumuloRyaDAO setupDao(AccumuloRdfConfiguration accumuloRdfConfiguration) { + Connector connector = setupConnector(accumuloRdfConfiguration); + return setupDao(connector, accumuloRdfConfiguration); + } + + /** + * Sets up a {@link AccumuloRyaDAO} with the specified connector and config. + * @param connector the {@link Connector}. + * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}. + * @return the {@link AccumuloRyaDAO}. + */ + public static AccumuloRyaDAO setupDao(Connector connector, AccumuloRdfConfiguration accumuloRdfConfiguration) { + AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); + accumuloRyaDao.setConnector(connector); + accumuloRyaDao.setConf(accumuloRdfConfiguration); + + try { + accumuloRyaDao.init(); + } catch (RyaDAOException e) { + log.error("Error initializing DAO", e); + } + + return accumuloRyaDao; + } + + /** + * Creates a copy tool parent time offset {@link RyaStatement} from the specified offset. + * @param timeOffset the copy tool parent time offset. (in milliseconds). + * @return the {@link RyaStatement} for the copy tool parent time offset. + */ + public static RyaStatement createTimeOffsetRyaStatement(long timeOffset) { + Literal literal = VALUE_FACTORY.createLiteral(timeOffset); + RyaType timeObject = new RyaType(literal.getDatatype(), literal.stringValue()); + return new RyaStatement(RTS_SUBJECT_RYA, RTS_TIME_OFFSET_PREDICATE_RYA, timeObject); + } + + /** + * Gets the copy tool parent time offset metadata key for the table. + * @param dao the {@link AccumuloRyaDAO}. + * @return the difference between the parent machine's system time and + * the NTP server's time or {@code null}. + * @throws RyaDAOException + */ + public static Long getTimeOffset(AccumuloRyaDAO dao) throws RyaDAOException { + String timeOffsetString = getMetadata(RTS_TIME_OFFSET_PREDICATE_RYA, dao); + Long timeOffset = null; + if (timeOffsetString != null) { + timeOffset = Long.valueOf(timeOffsetString); + } + return timeOffset; + } + + /** + * Sets the copy tool parent time offset metadata key for the table. + * @param timeOffset the difference between the parent machine's system time and + * the NTP server's time. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + public static RyaStatement setTimeOffset(long timeOffset, AccumuloRyaDAO dao) throws RyaDAOException { + RyaStatement ryaStatement = createTimeOffsetRyaStatement(timeOffset); + dao.add(ryaStatement); + return ryaStatement; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/CopyRule.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/CopyRule.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/CopyRule.java new file mode 100644 index 0000000..b398c2b --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/CopyRule.java @@ -0,0 +1,383 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.openrdf.model.Statement; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.And; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.QueryModelNodeBase; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.EvaluationStrategy; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import mvm.rya.accumulo.mr.merge.util.QueryRuleset.QueryRulesetException; + +/** + * A rule that defines a subset of statements to copy at the RDF level. Consists of a + * statement pattern and an optional filter expression. + */ +public class CopyRule extends QueryModelNodeBase { + private static final ValueConstant TRUE = new ValueConstant(ValueFactoryImpl.getInstance().createLiteral(true)); + + private static final String SUFFIX = UUID.randomUUID().toString(); + private static final Var SUBJ_VAR = new Var("subject_" + SUFFIX); + private static final Var PRED_VAR = new Var("predicate_" + SUFFIX); + private static final Var OBJ_VAR = new Var("object_" + SUFFIX); + private static final Var CON_VAR = new Var("context_" + SUFFIX); + private static final Var UNDEFINED_VAR = new Var("undefined_" + SUFFIX); + + /** + * Return whether a concrete statement satisfies a condition. + * @param stmt A statement from the input + * @param condition The condition to test. Expects variables expressed using the standard variable + * names used by this class, so it must be a rule's condition or derived from rules' conditions. + * @param strategy The evaluation strategy to apply + * @return True if the statement matches the condition + * @throws ValueExprEvaluationException + * @throws QueryEvaluationException + */ + public static boolean accept(Statement stmt, ValueExpr condition, EvaluationStrategy strategy) + throws ValueExprEvaluationException, QueryEvaluationException { + QueryBindingSet bindings = new QueryBindingSet(); + bindings.addBinding(SUBJ_VAR.getName(), stmt.getSubject()); + bindings.addBinding(PRED_VAR.getName(), stmt.getPredicate()); + bindings.addBinding(OBJ_VAR.getName(), stmt.getObject()); + if (stmt.getContext() != null) { + bindings.addBinding(CON_VAR.getName(), stmt.getContext()); + } + return strategy.isTrue(condition, bindings); + } + + /** + * Detect that a condition is trivial and can be replaced. This may mean it is always + * true, or that it contains undefined variables and therefore we must assume it is + * true or risk missing relevant statements. + * @param expr Condition to be evaluated + * @return true if the condition is trivial + */ + private static boolean trivialCondition(ValueExpr expr) { + // If the expression is null or the constant "true": + if (expr == null || expr.equals(TRUE)) { + return true; + } + // If the expression contains undefined variables: + VarSearchVisitor visitor = new VarSearchVisitor(UNDEFINED_VAR.getName()); + expr.visit(visitor); + if (visitor.found) { + return true; + } + // Otherwise, the condition is non-trivial. + return false; + } + + /** + * Visitor that checks a tree for the existence of a given variable name. + */ + private static class VarSearchVisitor extends QueryModelVisitorBase<RuntimeException> { + boolean found = false; + private String queryVar; + public VarSearchVisitor(String queryVar) { + this.queryVar = queryVar; + } + @Override + public void meet(Var var) { + if (queryVar.equals(var.getName())) { + found = true; + } + } + @Override + public void meetNode(QueryModelNode node) { + if (!found) { + node.visitChildren(this); + } + } + } + + /** + * Visitor that standardizes variables to canonical names and restructures + * operators to preserve meaningful expressions while eliminating undefined + * conditions. + */ + private static class RuleVisitor extends QueryModelVisitorBase<RuntimeException> { + private CopyRule rule; + RuleVisitor(CopyRule rule) { + this.rule = rule; + } + @Override + public void meet(Var node) { + String oldName = node.getName(); + if (rule.varMap.containsKey(oldName)) { + node.setName(rule.varMap.get(oldName).getName()); + } + else { + if (node.hasValue() || node.equals(SUBJ_VAR) || node.equals(PRED_VAR) || node.equals(OBJ_VAR) || node.equals(CON_VAR)) { + return; + } + node.setName(UNDEFINED_VAR.getName()); + } + } + /** + * If we can't evaluate one half of an AND by looking at this statement alone, it might + * turn out to be true in the full graph, so the statement is relevant if the half we + * can evaluate is true. If we can't evaluate either half, then the AND is useless and + * we must assume the statement is relevant. Otherwise, keep both sides. + */ + @Override + public void meet(And expr) { + ValueExpr left = expr.getLeftArg(); + ValueExpr right = expr.getRightArg(); + left.visit(this); + right.visit(this); + QueryModelNode parent = expr.getParentNode(); + if (trivialCondition(left)) { + if (trivialCondition(right)) { + // Both sides are trivial; replace whole node + parent.replaceChildNode(expr, null); + } + else { + // Left side trivial, right side good; replace node with right arg + parent.replaceChildNode(expr, right); + } + } + else if (trivialCondition(right)) { + // Left side good, right side trivial; replace node with left arg + parent.replaceChildNode(expr, left); + } + // Otherwise, both sides are useful + } + } + + private StatementPattern statement; + private ValueExpr condition; + private Map<String, Var> varMap = new HashMap<>(); + private RuleVisitor visitor = new RuleVisitor(this); + + /** + * Instantiate a rule containing a StatementPattern, renaming any variables to canonical + * subject/predicate/object forms and saving the mappings from the original variable names. + * @param sp StatementPattern defining a set of triples to match + */ + public CopyRule(StatementPattern sp) throws QueryRulesetException { + statement = sp; + Var subjVar = statement.getSubjectVar(); + Var predVar = statement.getPredicateVar(); + Var objVar = statement.getObjectVar(); + Var conVar = statement.getContextVar(); + int variables = 0; + if (subjVar == null || !subjVar.hasValue()) { + sp.setSubjectVar(SUBJ_VAR); + if (subjVar != null) { + varMap.put(subjVar.getName(), SUBJ_VAR); + } + variables++; + } + if (predVar == null || !predVar.hasValue()) { + sp.setPredicateVar(PRED_VAR); + if (predVar != null) { + varMap.put(predVar.getName(), PRED_VAR); + } + variables++; + } + if (objVar == null || !objVar.hasValue()) { + sp.setObjectVar(OBJ_VAR); + if (objVar != null) { + varMap.put(objVar.getName(), OBJ_VAR); + } + variables++; + } + if (variables == 3) { + throw new QueryRulesetException("Statement pattern with no constants would match every statement:\n" + sp); + } + if (conVar != null && !conVar.hasValue()) { + sp.setContextVar(CON_VAR); + varMap.put(conVar.getName(), CON_VAR); + } + } + + /** + * Set the complete condition. + */ + private void setCondition(ValueExpr newCondition) { + this.condition = newCondition; + this.condition.setParentNode(this); + } + + /** + * Constrain a rule with a filter condition. If there are already conditions on the rule, they + * will be ANDed together. If this rule doesn't define all the variables used in the condition + * (because the rule only matches one statement pattern), it will assume those portions match, + * so that we are guaranteed to include all relevant statements. + * @param condition A boolean filter expression + */ + public void addCondition(ValueExpr condition) { + ValueExpr newCondition = condition.clone(); + if (this.condition == null) { + setCondition(newCondition); + } + else { + this.setCondition(new And(this.condition, newCondition)); + } + this.condition.visit(visitor); + // If, after rewriting, the condition still contains undefined variables, we can't + // meaningfully apply it to reject statements. + if (trivialCondition(this.condition)) { + this.condition = null; + } + } + + /** + * Get the rule's boolean filter condition, if any. + * @return a ValueExpr that can be applied to any matching statements, or null. + */ + public ValueExpr getCondition() { + return condition; + } + + /** + * Get the rule's statement pattern. + * @return A StatementPattern that defines which statements match the rule. + */ + public StatementPattern getStatement() { + return statement; + } + + /** + * Validate that this is a non-trivial rule. A trivial rule consists of + * a statement pattern whose subject, predicate, and object are all + * variables (therefore it would match every statement). + * @return true if the rule is valid (non-trivial) + */ + void validate() throws QueryRulesetException { + if (!(statement.getSubjectVar().isConstant() + || statement.getPredicateVar().isConstant() + || statement.getObjectVar().isConstant())) { + throw new QueryRulesetException("Statement pattern with no constants would match every statement:\n" + statement.toString()); + } + } + + /** + * Replace a node in the rule's condition with some replacement. If + * @param current If this is found in the condition, apply the replacement + * @param replacement Replace with this. If replacing the top-level node, the replacement + * must be a ValueExpr or null. + */ + @Override + public void replaceChildNode(QueryModelNode current, QueryModelNode replacement) { + if (current.equals(condition) && replacement instanceof ValueExpr) { + setCondition(((ValueExpr) replacement).clone()); + } + else if (current.equals(condition) && replacement == null) { + condition = null; + } + else if (condition != null) { + condition.replaceChildNode(current, replacement.clone()); + } + } + + /** + * Apply a visitor to both the statement and any conditions. + */ + @Override + public <X extends Exception> void visit(QueryModelVisitor<X> visitor) throws X { + if (statement != null) { + statement.visit(visitor); + } + if (condition != null) { + condition.visit(visitor); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(statement.toString().trim()); + if (condition != null) { + sb.append("\n Condition:\n \t"); + sb.append(condition.toString().trim().replace("\n", "\n \t")); + } + return sb.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!obj.getClass().equals(CopyRule.class)) { + return false; + } + CopyRule other = (CopyRule) obj; + if ((this.statement != null && !this.statement.equals(other.statement)) + || (this.statement == null && other.statement != null)) { + return false; + } + if ((this.condition != null && !this.condition.equals(other.condition)) + || (this.condition == null && other.condition != null)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = statement == null ? 0 : statement.hashCode(); + result += result * 41 + (condition == null ? 0 : condition.hashCode()); + return result; + } + + /** + * Detect whether this rule is at least as general as another. + * @param other Rule to compare against + * @return true if this rule will necessarily match everything the other rule would. + */ + public boolean isGeneralizationOf(CopyRule other) { + if (this.statement == null || other.statement == null) { + return false; + } + // If each component of the statement and the condition are at least as general + // as the other rule's, then this rule is at least as general. + return varIsGeneralization(this.statement.getSubjectVar(), other.statement.getSubjectVar()) + && varIsGeneralization(this.statement.getPredicateVar(), other.statement.getPredicateVar()) + && varIsGeneralization(this.statement.getObjectVar(), other.statement.getObjectVar()) + && varIsGeneralization(this.statement.getContextVar(), other.statement.getContextVar()) + && (this.condition == null || this.condition.equals(other.condition)); + } + + /** + * Determine whether the first variable is at least as general as the second. + */ + private static boolean varIsGeneralization(Var first, Var second) { + if (first == null || !first.hasValue()) { + // if first is a variable, it is at least as general + return true; + } + // Otherwise, it is only at least as general if they are the same value + return second != null && first.getValue().equals(second.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/GroupedRow.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/GroupedRow.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/GroupedRow.java new file mode 100644 index 0000000..a1ef2f1 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/GroupedRow.java @@ -0,0 +1,182 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Objects; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.lang.builder.CompareToBuilder; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +/** + * Composite {@link WritableComparable} consisting of an Accumulo row and a string representing a group, such that + * records can be grouped by the group name and sorted based on the {@link Key} and {@link Value}. Natural ordering + * (compareTo) compares group followed by key followed by value; SortComparator uses the natural ordering while + * GroupComparator sorts only based on group. + */ +public class GroupedRow implements WritableComparable<GroupedRow> { + private Text group = new Text(); + private Key key = new Key(); + private Value value = new Value(); + + /** + * Set the group that this row belongs to. + * @param name A common label + */ + public void setGroup(String name) { + group.set(name); + } + + /** + * Set the Key + * @param key Key associated with an Accumulo row + */ + public void setKey(Key key) { + this.key = key; + } + + /** + * Set the Value + * @param value Value associated with an Accumulo row + */ + public void setValue(Value value) { + this.value = value; + } + + /** + * Get the group name + * @return A label common to all rows that should be grouped together + */ + public Text getGroup() { + return group; + } + + /** + * Get the Key + * @return The key portion of the row + */ + public Key getKey() { + return key; + } + + /** + * Get the Value + * @return The value portion of the row + */ + public Value getValue() { + return value; + } + + /** + * Serialize the group, key, and value + */ + @Override + public void write(DataOutput out) throws IOException { + group.write(out); + key.write(out); + value.write(out); + } + + /** + * Deserialize the group, key, and value + */ + @Override + public void readFields(DataInput in) throws IOException { + group.readFields(in); + key.readFields(in); + value.readFields(in); + } + + /** + * Natural ordering; compares based on group and then key. + */ + @Override + public int compareTo(GroupedRow o) { + if (o == null) { + return 1; + } + return new CompareToBuilder().append(this.group, o.group).append(this.key, o.key).append(this.value, o.value).toComparison(); + } + + /** + * Generates a hash based on group, key, and value. + */ + @Override + public int hashCode() { + return Objects.hash(group, key, value); + } + + /** + * Test equality (group, key, value). + */ + @Override + public boolean equals(Object o) { + if (o == this) { return true; } + if (o != null && o instanceof GroupedRow) { + GroupedRow other = (GroupedRow) o; + return new EqualsBuilder().append(this.group, other.group).append(this.key, other.key).append(this.value, other.value).isEquals(); + } + return false; + } + + /** + * Comparator that sorts by group name, then by Key, then by Value. + */ + public static class SortComparator extends WritableComparator { + SortComparator() { + super(GroupedRow.class, true); + } + /** + * Compares the groups of two GroupedRow instances, and the keys if they share a group. + */ + @Override + public int compare(WritableComparable wc1, WritableComparable wc2) { + GroupedRow gk1 = (GroupedRow) wc1; + GroupedRow gk2 = (GroupedRow) wc2; + return gk1.compareTo(gk2); + } + } + + /** + * Comparator that only sorts by group, ignoring Key. + */ + public static class GroupComparator extends WritableComparator { + GroupComparator() { + super(GroupedRow.class, true); + } + /** + * Compares the groups of two GroupedRow instances. + */ + @Override + public int compare(WritableComparable wc1, WritableComparable wc2) { + GroupedRow gk1 = (GroupedRow) wc1; + GroupedRow gk2 = (GroupedRow) wc2; + return gk1.group.compareTo(gk2.group); + } + } +}
