http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/demo/util/DemoUtilities.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/demo/util/DemoUtilities.java b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/demo/util/DemoUtilities.java new file mode 100644 index 0000000..98aebfc --- /dev/null +++ b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/demo/util/DemoUtilities.java @@ -0,0 +1,108 @@ +package mvm.rya.accumulo.mr.merge.demo.util; + +import java.util.Scanner; + +import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; + +/** + * Utilities methods for the demo. + */ +public final class DemoUtilities { + private static final Logger log = Logger.getLogger(DemoUtilities.class); + + /** + * Holds different logging patterns to use. + */ + public static enum LoggingDetail { + /** + * Uses a light pattern layout. + * */ + LIGHT(new PatternLayout("%-5p - %m%n")), + /** + * Uses a detailed pattern layout. + */ + DETAILED(new PatternLayout("%d{MMM dd yyyy HH:mm:ss} %5p [%t] (%F:%L) - %m%n")); + + private PatternLayout patternLayout; + + /** + * Create a new {@link LoggingDetail}. + * @param patternLayout the {@link PatternLayout}. + */ + private LoggingDetail(PatternLayout patternLayout) { + this.patternLayout = patternLayout; + } + + /** + * @return the {@link PatternLayout}. + */ + public PatternLayout getPatternLayout() { + return patternLayout; + } + } + + private static final Scanner KEYBOARD_SCANNER = new Scanner(System.in); + + /** + * Private constructor to prevent instantiation. + */ + private DemoUtilities() { + } + + /** + * Generates a random {@code long} number within the specified range. + * @param min the minimum value (inclusive) of the range of {@code long}s to include. + * @param max the maximum value (inclusive) of the range of {@code long}s to include. + * @return the random {@code long}. + */ + public static long randLong(final long min, final long max) { + return new RandomDataGenerator().nextLong(min, max); + } + + /** + * Sets up log4j logging to fit the demo's needs. + */ + public static void setupLogging() { + setupLogging(LoggingDetail.LIGHT); + } + + /** + * Sets up log4j logging to fit the demo's needs. + * @param loggingDetail the {@link LoggingDetail} to use. + */ + public static void setupLogging(LoggingDetail loggingDetail) { + // Turn off all the loggers and customize how they write to the console. + final Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.OFF); + final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout"); + ca.setLayout(loggingDetail.getPatternLayout()); + + // Turn the loggers used by the demo back on. + //log.setLevel(Level.INFO); + rootLogger.setLevel(Level.INFO); + } + + /** + * Pauses the program until the user presses "Enter" in the console. + */ + public static void promptEnterKey() { + promptEnterKey(true); + } + + /** + * Pauses the program until the user presses "Enter" in the console. + * @param isPromptEnabled {@code true} if prompt display is enabled. {@code false} + * otherwise. + */ + public static void promptEnterKey(boolean isPromptEnabled) { + if (isPromptEnabled) { + log.info("Press \"ENTER\" to continue..."); + KEYBOARD_SCANNER.nextLine(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/AccumuloDualInstanceDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/AccumuloDualInstanceDriver.java b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/AccumuloDualInstanceDriver.java new file mode 100644 index 0000000..9360c07 --- /dev/null +++ b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/AccumuloDualInstanceDriver.java @@ -0,0 +1,592 @@ +package mvm.rya.accumulo.mr.merge.driver; + +import java.io.File; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.merge.util.AccumuloInstanceDriver; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import twitter4j.Logger; + +/** + * Handles running a {@link MiniAccumuloCluster} or a {@link MockInstance} for a parent and child instance for testing. + */ +public class AccumuloDualInstanceDriver { + private static final Logger log = Logger.getLogger(AccumuloDualInstanceDriver.class); + + private boolean isMock; + private boolean shouldCreateIndices; + private boolean isParentReadOnly; + private boolean isChildReadOnly; + private boolean doesChildInitiallyExist; + + public static final String PARENT_USER_NAME = "parent_user"; + public static final String PARENT_PASSWORD = "parent_pwd"; + public static final String PARENT_INSTANCE = "parent_instance"; + public static final String PARENT_TABLE_PREFIX = "pt_"; + public static final String PARENT_AUTH = "parent_auth"; + public static final ColumnVisibility PARENT_COLUMN_VISIBILITY = new ColumnVisibility(PARENT_AUTH); + + public static final String CHILD_USER_NAME = "child_user"; + public static final String CHILD_PASSWORD = "child_pwd"; + public static final String CHILD_INSTANCE = "child_instance"; + public static final String CHILD_TABLE_PREFIX = "ct_"; + public static final String CHILD_AUTH = "child_auth"; + public static final ColumnVisibility CHILD_COLUMN_VISIBILITY = new ColumnVisibility(CHILD_AUTH); + + private AccumuloInstanceDriver parentAccumuloInstanceDriver; + private AccumuloInstanceDriver childAccumuloInstanceDriver; + + /** + * Creates a new instance of {@link AccumuloDualInstanceDriver}. + * @param isMock {@code true} if the parent and child instances will use {@link MockInstance}s. + * {@code false} if the parent and child instances will use {@link MiniAccumuloCluster}s. + * @param shouldCreateIndices {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + * @param isParentReadOnly {@code true} if all the tables in the parent instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + * @param isChildReadOnly {@code true} if all the tables in the child instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + * @param doesChildInitiallyExist {@code true} if all the child instance exists initially. + * {@code false} otherwise. + */ + public AccumuloDualInstanceDriver(boolean isMock, boolean shouldCreateIndices, boolean isParentReadOnly, boolean isChildReadOnly, boolean doesChildInitiallyExist) { + this.isMock = isMock; + this.shouldCreateIndices = shouldCreateIndices; + this.isParentReadOnly = isParentReadOnly; + this.isChildReadOnly = isChildReadOnly; + this.doesChildInitiallyExist = doesChildInitiallyExist; + String parentUser = isMock ? PARENT_USER_NAME : AccumuloInstanceDriver.ROOT_USER_NAME; + String childUser = isMock ? CHILD_USER_NAME : AccumuloInstanceDriver.ROOT_USER_NAME; + parentAccumuloInstanceDriver = new AccumuloInstanceDriver("Parent", isMock, shouldCreateIndices, isParentReadOnly, true, parentUser, PARENT_PASSWORD, PARENT_INSTANCE, PARENT_TABLE_PREFIX, PARENT_AUTH); + childAccumuloInstanceDriver = new AccumuloInstanceDriver("Child", isMock, shouldCreateIndices, isChildReadOnly, false, childUser, CHILD_PASSWORD, CHILD_INSTANCE, CHILD_TABLE_PREFIX, CHILD_AUTH); + } + + /** + * Sets up the parent and child {@link AccumuloInstanceDriver}s. + * @throws Exception + */ + public void setUp() throws Exception { + log.info("Setting up parent and child drivers."); + setUpInstances(); + setUpTables(); + setUpDaos(); + setUpConfigs(); + } + + /** + * Sets up the parent and child instances. + * @throws Exception + */ + public void setUpInstances() throws Exception { + parentAccumuloInstanceDriver.setUpInstance(); + if (doesChildInitiallyExist) { + childAccumuloInstanceDriver.setUpInstance(); + } + } + + /** + * Sets up all the tables and indices for the parent and child instances. + * @throws Exception + */ + public void setUpTables() throws Exception { + parentAccumuloInstanceDriver.setUpTables(); + if (doesChildInitiallyExist) { + childAccumuloInstanceDriver.setUpTables(); + } + } + + /** + * Sets up the {@link AccumuloRyaDAO}s for the parent and child instances. + * @throws Exception + */ + public void setUpDaos() throws Exception { + parentAccumuloInstanceDriver.setUpDao(); + if (doesChildInitiallyExist) { + childAccumuloInstanceDriver.setUpDao(); + } + } + + /** + * Sets up the configuration and prints the arguments for the parent and child instances. + */ + public void setUpConfigs() { + parentAccumuloInstanceDriver.setUpConfig(); + childAccumuloInstanceDriver.setUpConfig(); + } + + /** + * Tears down all the tables and indices for the parent and child instances. + * @throws Exception + */ + public void tearDownTables() throws Exception { + parentAccumuloInstanceDriver.tearDownTables(); + childAccumuloInstanceDriver.tearDownTables(); + } + + /** + * Tears down the {@link AccumuloRyaDAO}s for the parent and child instances. + * @throws Exception + */ + public void tearDownDaos() throws Exception { + parentAccumuloInstanceDriver.tearDownDao(); + childAccumuloInstanceDriver.tearDownDao(); + } + + /** + * Tears down the parent and child instances. + * @throws Exception + */ + public void tearDownInstances() throws Exception { + parentAccumuloInstanceDriver.tearDownInstance(); + childAccumuloInstanceDriver.tearDownInstance(); + } + + /** + * Tears down the {@link AccumuloInstanceDriver} for the parent and child instances. + * @throws Exception + */ + public void tearDown() throws Exception { + try { + //tearDownTables(); + tearDownDaos(); + tearDownInstances(); + } finally { + removeTempDirs(); + } + } + + /** + * Deletes the {@link MiniAccumuloCluster} temporary directories for the parent and child instances. + */ + private void removeTempDirs() { + parentAccumuloInstanceDriver.removeTempDir(); + childAccumuloInstanceDriver.removeTempDir(); + } + + /** + * Adds authorizations to the {@link SecurityOperations} of the parent instance's user. + * @param auths the list of authorizations to add. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public void addParentAuths(String... auths) throws AccumuloException, AccumuloSecurityException { + parentAccumuloInstanceDriver.addAuths(auths); + } + + /** + * Adds authorizations to the {@link SecurityOperations} of the child instance's user. + * @param auths the list of authorizations to add. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public void addChildAuths(String... auths) throws AccumuloException, AccumuloSecurityException { + childAccumuloInstanceDriver.addAuths(auths); + } + + /** + * @return the {@link Authorizations} of the parent instance's user. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public Authorizations getParentAuths() throws AccumuloException, AccumuloSecurityException { + return parentAccumuloInstanceDriver.getAuths(); + } + + /** + * @return the {@link Authorizations} of the child instance's user. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public Authorizations getChildAuths() throws AccumuloException, AccumuloSecurityException { + return childAccumuloInstanceDriver.getAuths(); + } + + /** + * Adds a {@link Collection} of {@link RyaStatement}s to the parent instance's DAO. + * @param ryaStatements the {@link Collection} of {@link RyaStatement}s. + * @throws RyaDAOException + */ + public void addParentRyaStatements(Collection<RyaStatement> ryaStatements) throws RyaDAOException { + addRyaStatements(ryaStatements.iterator(), parentAccumuloInstanceDriver.getDao()); + } + + /** + * Adds a {@link Collection} of {@link RyaStatement}s to the child instance's DAO. + * @param ryaStatements the {@link Collection} of {@link RyaStatement}s. + * @throws RyaDAOException + */ + public void addChildRyaStatements(Collection<RyaStatement> ryaStatements) throws RyaDAOException { + addRyaStatements(ryaStatements.iterator(), childAccumuloInstanceDriver.getDao()); + } + + /** + * Adds {@link RyaStatement}s to the parent instance's DAO from the provided {@link Iterator}. + * @param ryaStatementIterator the {@link RyaStatement} {@link Iterator}. + * @throws RyaDAOException + */ + public void addParentRyaStatements(Iterator<RyaStatement> ryaStatementIterator) throws RyaDAOException { + addRyaStatements(ryaStatementIterator, parentAccumuloInstanceDriver.getDao()); + } + + /** + * Adds {@link RyaStatement}s to the child instance's DAO from the provided {@link Iterator}. + * @param ryaStatementIterator the {@link RyaStatement} {@link Iterator}. + * @throws RyaDAOException + */ + public void addChildRyaStatements(Iterator<RyaStatement> ryaStatementIterator) throws RyaDAOException { + addRyaStatements(ryaStatementIterator, childAccumuloInstanceDriver.getDao()); + } + + /** + * Adds a {@link RyaStatement} to the parent instance's DAO. + * @param ryaStatement the {@link RyaStatement}. + * @throws RyaDAOException + */ + public void addParentRyaStatement(RyaStatement ryaStatement) throws RyaDAOException { + addRyaStatement(ryaStatement, parentAccumuloInstanceDriver.getDao()); + } + + /** + * Adds a {@link RyaStatement} to the child instance's DAO. + * @param ryaStatement the {@link RyaStatement}. + * @throws RyaDAOException + */ + public void addChildRyaStatement(RyaStatement ryaStatement) throws RyaDAOException { + addRyaStatement(ryaStatement, childAccumuloInstanceDriver.getDao()); + } + + /** + * Adds {@link RyaStatement}s to specified DAO from the provided {@link Iterator}. + * @param ryaStatementIterator the {@link RyaStatement} {@link Iterator}. + * @param dao the {@link AccumuloRyaDAO}. + * @throws RyaDAOException + */ + private static void addRyaStatements(Iterator<RyaStatement> ryaStatementIterator, AccumuloRyaDAO dao) throws RyaDAOException { + dao.add(ryaStatementIterator); + } + + /** + * Adds a {@link RyaStatement} to the specified DAO. + * @param ryaStatement the {@link RyaStatement}. + * @throws RyaDAOException + */ + private static void addRyaStatement(RyaStatement ryaStatement, AccumuloRyaDAO dao) throws RyaDAOException { + dao.add(ryaStatement); + } + + /** + * @return the parent instance's {@link AccumuloInstanceDriver}. + */ + public AccumuloInstanceDriver getParentAccumuloInstanceDriver() { + return parentAccumuloInstanceDriver; + } + + /** + * @return the child instance's {@link AccumuloInstanceDriver}. + */ + public AccumuloInstanceDriver getChildAccumuloInstanceDriver() { + return childAccumuloInstanceDriver; + } + + /** + * @return {@code true} if this is a mock instance. {@code false} if this is a MiniAccumuloCluster instance. + */ + public boolean isMock() { + return isMock; + } + + /** + * @return {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + */ + public boolean shouldCreateIndices() { + return shouldCreateIndices; + } + + /** + * @return {@code true} if all the tables in the parent instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + */ + public boolean isParentReadOnly() { + return isParentReadOnly; + } + + /** + * @return {@code true} if all the tables in the child instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + */ + public boolean isChildReadOnly() { + return isChildReadOnly; + } + + /** + * @return {@code true} if all the child instance exists initially. + * {@code false} otherwise. + */ + public boolean doesChildInitiallyExist() { + return doesChildInitiallyExist; + } + + /** + * @return the user name tied to the parent instance. + */ + public String getParentUser() { + return parentAccumuloInstanceDriver.getUser(); + } + + /** + * @return the user name tied to the child instance. + */ + public String getChildUser() { + return childAccumuloInstanceDriver.getUser(); + } + + /** + * @return the password for the parent instance's user. + */ + public String getParentPassword() { + return parentAccumuloInstanceDriver.getPassword(); + } + + /** + * @return the password for the child instance's user. + */ + public String getChildPassword() { + return childAccumuloInstanceDriver.getPassword(); + } + + /** + * @return the name of the parent instance. + */ + public String getParentInstanceName() { + return parentAccumuloInstanceDriver.getInstanceName(); + } + + /** + * @return the name of the child instance. + */ + public String getChildInstanceName() { + return childAccumuloInstanceDriver.getInstanceName(); + } + + /** + * @return the parent instance's table prefix. + */ + public String getParentTablePrefix() { + return parentAccumuloInstanceDriver.getTablePrefix(); + } + + /** + * @return the child instance's table prefix. + */ + public String getChildTablePrefix() { + return childAccumuloInstanceDriver.getTablePrefix(); + } + + /** + * @return the comma-separated authorization list for the parent instance. + */ + public String getParentAuth() { + return parentAccumuloInstanceDriver.getAuth(); + } + + /** + * @return the comma-separated authorization list for the child instance. + */ + public String getChildAuth() { + return childAccumuloInstanceDriver.getAuth(); + } + + /** + * @return the {@link Connector} to the parent instance. + */ + public Connector getParentConnector() { + return parentAccumuloInstanceDriver.getConnector(); + } + + /** + * @return the {@link Connector} to the child instance. + */ + public Connector getChildConnector() { + return childAccumuloInstanceDriver.getConnector(); + } + + /** + * @return the {@link AccumuloRyaDAO} for the parent instance. + */ + public AccumuloRyaDAO getParentDao() { + return parentAccumuloInstanceDriver.getDao(); + } + + /** + * @return the {@link AccumuloRyaDAO} for the child instance. + */ + public AccumuloRyaDAO getChildDao() { + return childAccumuloInstanceDriver.getDao(); + } + + /** + * @return the {@link SecurityOperations} for the parent instance. + */ + public SecurityOperations getParentSecOps() { + return parentAccumuloInstanceDriver.getSecOps(); + } + + /** + * @return the {@link SecurityOperations} for the child instance. + */ + public SecurityOperations getChildSecOps() { + return childAccumuloInstanceDriver.getSecOps(); + } + + /** + * @return the {@link AccumuloRdfConfiguration} for the parent instance. + */ + public AccumuloRdfConfiguration getParentConfig() { + return parentAccumuloInstanceDriver.getConfig(); + } + + /** + * @return the {@link AccumuloRdfConfiguration} for the child instance. + */ + public AccumuloRdfConfiguration getChildConfig() { + return childAccumuloInstanceDriver.getConfig(); + } + + /** + * @return the {@link MiniAccumuloCluster} for the parent instance or {@code null} + * if this is a {@link MockInstance}. + */ + public MiniAccumuloCluster getParentMiniAccumuloCluster() { + return parentAccumuloInstanceDriver.getMiniAccumuloCluster(); + } + + /** + * @return the {@link MiniAccumuloCluster} for the child instance or {@code null} + * if this is a {@link MockInstance}. + */ + public MiniAccumuloCluster getChildMiniAccumuloCluster() { + return childAccumuloInstanceDriver.getMiniAccumuloCluster(); + } + + /** + * @return the {@link MockInstance} for the parent instance or {@code null} + * if this is a {@link MiniAccumuloCluster}. + */ + public MockInstance getParentMockInstance() { + return parentAccumuloInstanceDriver.getMockInstance(); + } + + /** + * @return the {@link MockInstance} for the child instance or {@code null} + * if this is a {@link MiniAccumuloCluster}. + */ + public MockInstance getChildMockInstance() { + return childAccumuloInstanceDriver.getMockInstance(); + } + + /** + * @return the {@link ZooKeeperInstance} for the parent instance or {@code null} if + * this is a {@link MockInstance}. + */ + public ZooKeeperInstance getParentZooKeeperInstance() { + return parentAccumuloInstanceDriver.getZooKeeperInstance(); + } + + /** + * @return the {@link ZooKeeperInstance} for the child instance or {@code null} if + * this is a {@link MockInstance}. + */ + public ZooKeeperInstance getChildZooKeeperInstance() { + return childAccumuloInstanceDriver.getZooKeeperInstance(); + } + + /** + * @return the parent {@link ZooKeepInstance} or {@link MockInstance}. + */ + public Instance getParentInstance() { + return parentAccumuloInstanceDriver.getInstance(); + } + + /** + * @return the child {@link ZooKeepInstance} or {@link MockInstance}. + */ + public Instance getChildInstance() { + return childAccumuloInstanceDriver.getInstance(); + } + + /** + * @return the comma-separated list of zoo keeper host names for the parent instance. + */ + public String getParentZooKeepers() { + return parentAccumuloInstanceDriver.getZooKeepers(); + } + + /** + * @return the comma-separated list of zoo keeper host names for the child instance. + */ + public String getChildZooKeepers() { + return childAccumuloInstanceDriver.getZooKeepers(); + } + + /** + * @return an unmodifiable map of the configuration keys and values for the parent instance. + */ + public Map<String, String> getParentConfigMap() { + return parentAccumuloInstanceDriver.getConfigMap(); + } + + /** + * @return an unmodifiable map of the configuration keys and values for the child instance. + */ + public Map<String, String> getChildConfigMap() { + return childAccumuloInstanceDriver.getConfigMap(); + } + + /** + * @return an unmodifiable list of the table names and indices for the parent instance. + */ + public List<String> getParentTableList() { + return parentAccumuloInstanceDriver.getTableList(); + } + + /** + * @return an unmodifiable list of the table names and indices for the child instance. + */ + public List<String> getChildTableList() { + return childAccumuloInstanceDriver.getTableList(); + } + + /** + * @return the {@link MiniAccumuloCluster} temporary directory for the parent instance or {@code null} + * if it's a {@link MockInstance}. + */ + public File getParentTempDir() { + return parentAccumuloInstanceDriver.getTempDir(); + } + + /** + * @return the {@link MiniAccumuloCluster} temporary directory for the child instance or {@code null} + * if it's a {@link MockInstance}. + */ + public File getChildTempDir() { + return childAccumuloInstanceDriver.getTempDir(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/MiniAccumuloClusterDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/MiniAccumuloClusterDriver.java b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/MiniAccumuloClusterDriver.java new file mode 100644 index 0000000..92c451d --- /dev/null +++ b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/driver/MiniAccumuloClusterDriver.java @@ -0,0 +1,161 @@ +package mvm.rya.accumulo.mr.merge.driver; + +import static mvm.rya.accumulo.mr.merge.util.TestUtils.LAST_MONTH; +import static mvm.rya.accumulo.mr.merge.util.TestUtils.TODAY; +import static mvm.rya.accumulo.mr.merge.util.TestUtils.createRyaStatement; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.log4j.Logger; + +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.TestUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; + +/** + * Runs a {@link MiniAccumuloCluster}. + */ +public class MiniAccumuloClusterDriver extends AccumuloDualInstanceDriver { + private static final Logger log = Logger.getLogger(MiniAccumuloClusterDriver.class); + + /** + * {@code true} to configure the cluster for merging. {@code false} to configure + * the cluster for copying. + */ + private static final boolean IS_MERGE_SETUP = true; + + private static boolean keepRunning = true; + + /** + * Creates a new instance of {@link MiniAccumuloClusterDriver}. + */ + public MiniAccumuloClusterDriver() { + super(false, !IS_MERGE_SETUP, !IS_MERGE_SETUP, IS_MERGE_SETUP, IS_MERGE_SETUP); + } + + private void writeStatements() throws RyaDAOException, IOException { + // This statement was in both parent/child instances a month ago and is before the start time of yesterday + // but it was left alone. It should remain in the parent after merging. + RyaStatement ryaStatementOutOfTimeRange = createRyaStatement("coach", "called", "timeout", LAST_MONTH); + + // This statement was in both parent/child instances a month ago but after the start time of yesterday + // the parent deleted it and the child still has it. It should stay deleted in the parent after merging. + RyaStatement ryaStatementParentDeletedAfter = createRyaStatement("parent", "deleted", "after", LAST_MONTH); + + // This statement was added by the parent after the start time of yesterday and doesn't exist in the child. + // It should stay in the parent after merging. + RyaStatement ryaStatementParentAddedAfter = createRyaStatement("parent", "added", "after", TODAY); + + // This statement was in both parent/child instances a month ago but after the start time of yesterday + // the child deleted it and the parent still has it. It should be deleted from the parent after merging. + RyaStatement ryaStatementChildDeletedAfter = createRyaStatement("child", "deleted", "after", LAST_MONTH); + + // This statement was added by the child after the start time of yesterday and doesn't exist in the parent. + // It should be added to the parent after merging. + RyaStatement ryaStatementChildAddedAfter = createRyaStatement("child", "added", "after", TODAY); + + // This statement was modified by the child after the start of yesterday (The timestamp changes after updating) + // It should be updated in the parent to match the child. + RyaStatement ryaStatementUpdatedByChild = createRyaStatement("bob", "catches", "ball", LAST_MONTH); + + RyaStatement ryaStatementUntouchedByChild = createRyaStatement("bill", "talks to", "john", LAST_MONTH); + + RyaStatement ryaStatementDeletedByChild = createRyaStatement("susan", "eats", "burgers", LAST_MONTH); + + RyaStatement ryaStatementAddedByChild = createRyaStatement("ronnie", "plays", "guitar", TODAY); + + // This statement was modified by the child to change the column visibility. + // The parent should combine the child's visibility with its visibility. + RyaStatement ryaStatementVisibilityDifferent = createRyaStatement("I", "see", "you", LAST_MONTH); + ryaStatementVisibilityDifferent.setColumnVisibility(PARENT_COLUMN_VISIBILITY.getExpression()); + + List<RyaStatement> parentStatements = new ArrayList<>(); + List<RyaStatement> childStatements = new ArrayList<>(); + + // Setup initial parent instance with 7 rows + // This is the state of the parent data (as it is today) before merging occurs which will use the specified start time of yesterday. + parentStatements.add(ryaStatementOutOfTimeRange); // Merging should keep statement + parentStatements.add(ryaStatementUpdatedByChild); // Merging should update statement + parentStatements.add(ryaStatementUntouchedByChild); // Merging should keep statement + parentStatements.add(ryaStatementDeletedByChild); // Merging should delete statement + parentStatements.add(ryaStatementVisibilityDifferent); // Merging should update statement + parentStatements.add(ryaStatementParentAddedAfter); // Merging should keep statement + parentStatements.add(ryaStatementChildDeletedAfter); // Merging should delete statement + + addParentRyaStatements(parentStatements); + + // Simulate the child coming back with a modified data set before the merging occurs. + // (1 updated row, 1 row left alone because it was unchanged, 1 row outside time range, + // 1 row deleted, 1 new row added, 1 modified visibility, 1 deleted by child, 1 added by child). + // There should be 5 rows in the child instance (4 which will be scanned over from the start time). + ryaStatementUpdatedByChild.setObject(TestUtils.createRyaUri("football")); + ryaStatementUpdatedByChild.setTimestamp(TODAY.getTime()); + ryaStatementVisibilityDifferent.setColumnVisibility(CHILD_COLUMN_VISIBILITY.getExpression()); + childStatements.add(ryaStatementOutOfTimeRange); + childStatements.add(ryaStatementUpdatedByChild); + childStatements.add(ryaStatementUntouchedByChild); + childStatements.add(ryaStatementAddedByChild); // Merging should add statement + childStatements.add(ryaStatementVisibilityDifferent); + childStatements.add(ryaStatementParentDeletedAfter); + childStatements.add(ryaStatementChildAddedAfter); // Merging should add statement + + if (IS_MERGE_SETUP) { + addChildRyaStatements(childStatements); + } + + + AccumuloRyaUtils.printTable(PARENT_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, getParentConfig()); + AccumuloRyaUtils.printTable(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, getChildConfig()); + } + + public static void main(String args[]) { + log.info("Setting up MiniAccumulo Cluster"); + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + log.fatal("Uncaught exception in " + thread.getName(), throwable); + } + }); + + final MiniAccumuloClusterDriver macDriver = new MiniAccumuloClusterDriver(); + try { + macDriver.setUp(); + log.info("Populating clusters"); + macDriver.writeStatements(); + log.info("MiniAccumuloClusters running and populated"); + } catch (Exception e) { + log.error("Error setting up and writing statements", e); + keepRunning = false; + } + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + log.info("Shutting down..."); + try { + macDriver.tearDown(); + } catch (Exception e) { + log.error("Error while shutting down", e); + } finally { + keepRunning = false; + log.info("Done shutting down"); + } + } + }); + + while(keepRunning) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + log.error("Interrupted exception while running MiniAccumuloClusterDriver", e); + keepRunning = false; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/util/TestUtils.java b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/util/TestUtils.java new file mode 100644 index 0000000..2758efd --- /dev/null +++ b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/util/TestUtils.java @@ -0,0 +1,230 @@ +package mvm.rya.accumulo.mr.merge.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Date; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; + +/** + * Utility methods for testing merging/copying. + */ +public final class TestUtils { + /** + * 'LAST_MONTH' is the testing timestamp for all data that was in the both the parent and child when the child was copied from the parent. + * So, after the child was created it should contain data with this timestamp too before merging. + */ + public static final Date LAST_MONTH = new Date(monthBefore(System.currentTimeMillis())); + + /** + * 'YESTERDAY' is the testing start timestamp specified by the user to use for merging, all data with timestamps + * after yesterday should be merged from the child to the parent. + */ + public static final Date YESTERDAY = new Date(dayBefore(System.currentTimeMillis())); + + /** + * 'TODAY' is when the merge process is actually happening. + */ + public static final Date TODAY = new Date(); + + /** + * Indicates when something occurred: before or after. + */ + public static enum Occurrence { + BEFORE(-1), + AFTER(1); + + private int sign; + + /** + * Creates a new {@link Occurrence}. + * @param sign the sign value: positive or negative. + */ + private Occurrence(int sign) { + this.sign = sign; + } + + /** + * @return the sign value: positive or negative. + */ + public int getSign() { + return sign; + } + } + + /** + * A {@code CalendarUnit} represents time durations at a given unit of + * granularity and provides utility methods to convert to milliseconds. This + * is similar to {@code java.util.concurrent.TimeUnit} but adds the week, + * month, and year units and only converts to milliseconds. + */ + public static enum CalendarUnit { + MILLISECOND(1L), + SECOND(1000L * MILLISECOND.getMilliseconds()), + MINUTE(60L * SECOND.getMilliseconds()), + HOUR(60L * MINUTE.getMilliseconds()), + DAY(24L * HOUR.getMilliseconds()), + WEEK(7L * DAY.getMilliseconds()), + MONTH(31L * DAY.getMilliseconds()), + YEAR(365L * DAY.getMilliseconds()); + + private long milliseconds; + + /** + * Creates a new {@link CalendarUnit}. + * @param milliseconds the milliseconds value of this unit. + */ + private CalendarUnit(long milliseconds) { + this.milliseconds = milliseconds; + } + + /** + * @return the milliseconds value of this unit. + */ + public long getMilliseconds() { + return milliseconds; + } + } + + private static final String NAMESPACE = "#:";//"urn:x:x#"; //"urn:test:litdups#"; + + /** + * Private constructor to prevent instantiation. + */ + private TestUtils() { + } + + /** + * Finds the time value for one day before the specified time. + * @param time the time to find the new time occurrence from. (in milliseconds) + * @return the value of one day before the specified {@code time}. (in milliseconds) + */ + public static long dayBefore(long time) { + return timeFrom(time, 1, CalendarUnit.DAY, Occurrence.BEFORE); + } + + /** + * Finds the date value for one day before the specified time. + * @param date the {@link Date} to find the new time occurrence from. + * @return the {@link Date} value of one day before the specified {@code date}. + */ + public static Date dayBefore(Date date) { + return dateFrom(date, 1, CalendarUnit.DAY, Occurrence.BEFORE); + } + + /** + * Finds the time value for one month before the specified time. + * @param time the time to find the new time occurrence from. (in milliseconds) + * @return the value of one month before the specified {@code time}. (in milliseconds) + */ + public static long monthBefore(long time) { + return timeFrom(time, 1, CalendarUnit.MONTH, Occurrence.BEFORE); + } + + /** + * Finds the date value for one month before the specified time. + * @param date the {@link Date} to find the new time occurrence from. + * @return the {@link Date} value of one month before the specified {@code date}. + */ + public static Date monthBefore(Date date) { + return dateFrom(date, 1, CalendarUnit.MONTH, Occurrence.BEFORE); + } + + /** + * Determines the time from the specified duration before or after the provided time. + * For example, this can be used to find the time value of something that happened 1 month before + * the current time. + * @param time the time to find the new time occurrence from. (in milliseconds) + * @param duration the duration offset from the specified {@code time}. + * @param unit the {@link CalendarUnit} of the duration + * @param occurrence when the new time takes place, before or after. + * @return the value of the new time. (in milliseconds) + */ + public static long timeFrom(long time, long duration, CalendarUnit unit, Occurrence occurrence) { + long durationInMillis = occurrence.getSign() * duration * unit.getMilliseconds(); + long newTime = time + durationInMillis; + return newTime; + } + + /** + * Determines the date from the specified duration before or after the provided date. + * For example, this can be used to find the time value of something that happened 1 month before + * the current time. + * @param date the {@link Date} to find the new time occurrence from. + * @param duration the duration offset from the specified {@code time}. + * @param unit the {@link CalendarUnit} of the duration + * @param occurrence when the new time takes place, before or after. + * @return the value of the new {@link Date}. + */ + public static Date dateFrom(Date date, long duration, CalendarUnit unit, Occurrence occurrence) { + long time = timeFrom(date.getTime(), duration, unit, occurrence); + Date newDate = new Date(time); + return newDate; + } + + /** + * Checks if a {@link RyaStatement} is in the specified instance's DAO. + * @param description the description message for the statement. + * @param verifyResultCount the expected number of matches. + * @param matchStatement the {@link RyaStatement} to match. + * @param dao the {@link AccumuloRyaDAO}. + * @param config the {@link AccumuloRdfConfiguration} for the instance. + * @throws RyaDAOException + */ + public static void assertStatementInInstance(String description, int verifyResultCount, RyaStatement matchStatement, AccumuloRyaDAO dao, AccumuloRdfConfiguration config) throws RyaDAOException { + CloseableIteration<RyaStatement, RyaDAOException> iter = dao.getQueryEngine().query(matchStatement, config); + int count = 0; + while (iter.hasNext()) { + RyaStatement statement = iter.next(); + assertTrue(description + " - match subject: " + matchStatement, matchStatement.getSubject().equals(statement.getSubject())); + assertTrue(description + " - match predicate: " + matchStatement, matchStatement.getPredicate().equals(statement.getPredicate())); + assertTrue(description + " - match object match: " + matchStatement, matchStatement.getObject().equals(statement.getObject())); + count++; + } + iter.close(); + assertEquals(description+" - Match Counts.", verifyResultCount, count); + } + + /** + * Creates a {@link RyaURI} for the specified local name. + * @param localName the URI's local name. + * @return the {@link RyraURI}. + */ + public static RyaURI createRyaUri(String localName) { + return AccumuloRyaUtils.createRyaUri(NAMESPACE, localName); + } + + /** + * Creates a {@link RyaStatement} from the specified subject, predicate, and object. + * @param subject the subject. + * @param predicate the predicate. + * @param object the object. + * @param date the {@link Date} to use for the key's timestamp. + * @return the {@link RyaStatement}. + */ + public static RyaStatement createRyaStatement(String subject, String predicate, String object, Date date) { + RyaURI subjectUri = createRyaUri(subject); + RyaURI predicateUri = createRyaUri(predicate); + RyaURI objectUri = createRyaUri(object); + RyaStatement ryaStatement = new RyaStatement(subjectUri, predicateUri, objectUri); + if (date != null) { + ryaStatement.setTimestamp(date.getTime()); + } + return ryaStatement; + } + + /** + * Copies a {@link RyaStatement} into a new {@link RyaStatement}. + * @param s the {@link RyaStatement} to copy. + * @return the newly copied {@link RyaStatement}. + */ + public static RyaStatement copyRyaStatement(RyaStatement s) { + return new RyaStatement(s.getSubject(), s.getPredicate(), s.getObject(), s.getContext(), s.getQualifer(), s.getColumnVisibility(), s.getValue(), s.getTimestamp()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/copy_tool.bat ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/copy_tool.bat b/extras/rya.merger/startup_scripts/copy_tool.bat new file mode 100644 index 0000000..b8a91ff --- /dev/null +++ b/extras/rya.merger/startup_scripts/copy_tool.bat @@ -0,0 +1,9 @@ +@echo off + +echo "Launching Copy Tool..." + +:: Using a wildcard in the jar filename may not work in some Windows environments, +:: so use a hard-coded filename for the jar if necessary. +SET JAR_NAME=rya.merger-*.jar + +java -Xms256m -Xmx1024M -Dlog4j.configuration="file:config/copy_tool_log4j.xml" -cp %JAR_NAME% mvm.rya.accumulo.mr.merge.CopyTool -conf config/copy_tool_configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/copy_tool.sh ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/copy_tool.sh b/extras/rya.merger/startup_scripts/copy_tool.sh new file mode 100644 index 0000000..b1a24d2 --- /dev/null +++ b/extras/rya.merger/startup_scripts/copy_tool.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Launching Copy Tool..." + +java -Xms256m -Xmx1024M -Dlog4j.configuration="file:config/copy_tool_log4j.xml" -cp rya.merger-*.jar mvm.rya.accumulo.mr.merge.CopyTool -conf config/copy_tool_configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/hadoop_copy_tool.bat ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/hadoop_copy_tool.bat b/extras/rya.merger/startup_scripts/hadoop_copy_tool.bat new file mode 100644 index 0000000..81f7760 --- /dev/null +++ b/extras/rya.merger/startup_scripts/hadoop_copy_tool.bat @@ -0,0 +1,9 @@ +@echo off + +echo "Launching Copy Tool..." + +:: Using a wildcard in the jar filename may not work in some Windows environments, +:: so use a hard-coded filename for the jar if necessary. +SET JAR_NAME=rya.merger-*-shaded.jar + +hadoop jar %JAR_NAME% mvm.rya.accumulo.mr.merge.CopyTool -conf config/copy_tool_configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/hadoop_copy_tool.sh ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/hadoop_copy_tool.sh b/extras/rya.merger/startup_scripts/hadoop_copy_tool.sh new file mode 100644 index 0000000..30a8820 --- /dev/null +++ b/extras/rya.merger/startup_scripts/hadoop_copy_tool.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +echo "Launching Copy Tool..." + +class=mvm.rya.accumulo.mr.merge.CopyTool + +command="${ACCUMULO_HOME}/bin/tool.sh" +if [ ! -x $command ]; then + export HADOOP_CLASSPATH="${HADOOP_CLASSPATH}:lib/*" + command="hadoop jar" +fi + +$command rya.merger-*-shaded.jar mvm.rya.accumulo.mr.merge.CopyTool -conf config/copy_tool_configuration.xml http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/hadoop_merge_tool.bat ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/hadoop_merge_tool.bat b/extras/rya.merger/startup_scripts/hadoop_merge_tool.bat new file mode 100644 index 0000000..b1c9235 --- /dev/null +++ b/extras/rya.merger/startup_scripts/hadoop_merge_tool.bat @@ -0,0 +1,9 @@ +@echo off + +echo "Launching Merge Tool..." + +:: Using a wildcard in the jar filename may not work in some Windows environments, +:: so use a hard-coded filename for the jar if necessary. +SET JAR_NAME=rya.merger-*-shaded.jar + +hadoop jar %JAR_NAME% mvm.rya.accumulo.mr.merge.MergeTool -conf config/configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/hadoop_merge_tool.sh ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/hadoop_merge_tool.sh b/extras/rya.merger/startup_scripts/hadoop_merge_tool.sh new file mode 100644 index 0000000..b5660e3 --- /dev/null +++ b/extras/rya.merger/startup_scripts/hadoop_merge_tool.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Launching Merge Tool..." + +hadoop jar rya.merger-*-shaded.jar mvm.rya.accumulo.mr.merge.MergeTool -conf config/configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/merge_tool.bat ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/merge_tool.bat b/extras/rya.merger/startup_scripts/merge_tool.bat new file mode 100644 index 0000000..36b73ce --- /dev/null +++ b/extras/rya.merger/startup_scripts/merge_tool.bat @@ -0,0 +1,9 @@ +@echo off + +echo "Launching Merge Tool..." + +:: Using a wildcard in the jar filename may not work in some Windows environments, +:: so use a hard-coded filename for the jar if necessary. +SET JAR_NAME=rya.merger-*.jar + +java -Xms256m -Xmx1024M -Dlog4j.configuration="file:config/log4j.xml" -cp %JAR_NAME% mvm.rya.accumulo.mr.merge.MergeTool -conf config/configuration.xml \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/startup_scripts/merge_tool.sh ---------------------------------------------------------------------- diff --git a/extras/rya.merger/startup_scripts/merge_tool.sh b/extras/rya.merger/startup_scripts/merge_tool.sh new file mode 100644 index 0000000..83f8932 --- /dev/null +++ b/extras/rya.merger/startup_scripts/merge_tool.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Launching Merge Tool..." + +java -Xms256m -Xmx1024M -Dlog4j.configuration="file:config/log4j.xml" -cp rya.merger-*.jar mvm.rya.accumulo.mr.merge.MergeTool -conf config/configuration.xml \ No newline at end of file
