ACCUMULO-3130 Ensure MultiInstanceReplicationIT is running with SSL and credential providers
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e205e0c Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e205e0c Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e205e0c Branch: refs/heads/master Commit: 6e205e0c794c88b23c3e2a46baa3fdd0167ca5a3 Parents: e3b8ec5 Author: Josh Elser <els...@apache.org> Authored: Tue Sep 16 01:29:01 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Tue Sep 16 16:15:34 2014 -0400 ---------------------------------------------------------------------- .../accumulo/test/functional/AbstractMacIT.java | 2 +- .../test/functional/ConfigurableMacIT.java | 2 +- .../accumulo/test/functional/ExamplesIT.java | 8 +- .../accumulo/test/functional/SimpleMacIT.java | 2 +- .../replication/MultiInstanceReplicationIT.java | 141 ++++++++++++------- 5 files changed, 94 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java index 22e46ff..ce6164b 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java @@ -81,7 +81,7 @@ public abstract class AbstractMacIT { return names; } - protected static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) { + protected static void configureForEnvironment(MiniAccumuloConfigImpl cfg, File folder) { if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useSslForIT"))) { configureForSsl(cfg, folder); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java index 59b0977..67869e9 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java @@ -54,7 +54,7 @@ public class ConfigurableMacIT extends AbstractMacIT { Configuration coreSite = new Configuration(false); configure(cfg, coreSite); cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString()); - configureForEnvironment(cfg, getClass(), createSharedTestDir(this.getClass().getName() + "-ssl")); + configureForEnvironment(cfg, createSharedTestDir(this.getClass().getName() + "-ssl")); cluster = new MiniAccumuloClusterImpl(cfg); if (coreSite.size() > 0) { File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java index 7864ec8..b5d96d6 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java @@ -114,7 +114,7 @@ public class ExamplesIT extends AbstractMacIT { cfg.setDefaultMemory(cfg.getDefaultMemory() * 2, MemoryUnit.BYTE); cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString()); - configureForEnvironment(cfg, ExamplesIT.class, createSharedTestDir(ExamplesIT.class.getName() + "-ssl")); + configureForEnvironment(cfg, createSharedTestDir(ExamplesIT.class.getName() + "-ssl")); cluster = new MiniAccumuloClusterImpl(cfg); cluster.start(); @@ -228,18 +228,18 @@ public class ExamplesIT extends AbstractMacIT { bw.addMutation(m); bw.flush(); - + Iterator<Entry<Key, Value>> iter = c.createScanner(table, Authorizations.EMPTY).iterator(); assertTrue("Iterator had no results", iter.hasNext()); Entry<Key, Value> e = iter.next(); assertEquals("Results ", "1,3,4,2", e.getValue().toString()); assertFalse("Iterator had additional results", iter.hasNext()); - + m = new Mutation("foo"); m.put("a", "b", "0,20,20,2"); bw.addMutation(m); bw.close(); - + iter = c.createScanner(table, Authorizations.EMPTY).iterator(); assertTrue("Iterator had no results", iter.hasNext()); e = iter.next(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java index b166ffd..3e11653 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java @@ -55,7 +55,7 @@ public class SimpleMacIT extends AbstractMacIT { MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(folder, ROOT_PASSWORD); cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath()); cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, Boolean.TRUE.toString()); - configureForEnvironment(cfg, SimpleMacIT.class, createSharedTestDir(SimpleMacIT.class.getName() + "-ssl")); + configureForEnvironment(cfg, createSharedTestDir(SimpleMacIT.class.getName() + "-ssl")); cluster = new MiniAccumuloClusterImpl(cfg); cluster.start(); Runtime.getRuntime().addShutdownHook(new Thread() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e205e0c/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java index 3c6da2e..fcab23b 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java @@ -16,7 +16,9 @@ */ package org.apache.accumulo.test.replication; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; @@ -40,13 +42,14 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.master.replication.SequentialWorkAssigner; +import org.apache.accumulo.minicluster.MemoryUnit; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; @@ -96,20 +99,60 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { cfg.setProperty(Property.REPLICATION_NAME, "master"); cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); + cfg.setMemory(ServerType.MASTER, 1, MemoryUnit.GIGABYTE); + cfg.setMemory(ServerType.TABLET_SERVER, 256, MemoryUnit.MEGABYTE); hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); } + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + @Test(timeout = 60 * 5000) public void dataWasReplicatedToThePeer() throws Exception { MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + MiniAccumuloClusterImpl peerCluster = peerCfg.build(); peerCluster.start(); @@ -117,40 +160,40 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { try { final Connector connMaster = getConnector(); final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD); - + ReplicationTable.create(connMaster); String peerUserName = "peer", peerPassword = "foo"; - + String peerClusterName = "peer"; connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - + connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - + // ...peer = AccumuloReplicaSystem,instanceName,zookeepers connMaster.instanceOperations().setProperty( Property.REPLICATION_PEERS.getKey() + peerClusterName, ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); - + final String masterTable = "master", peerTable = "peer"; - + connMaster.tableOperations().create(masterTable); String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); Assert.assertNotNull(masterTableId); - + connPeer.tableOperations().create(peerTable); String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); Assert.assertNotNull(peerTableId); connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); - + // Replicate this table to the peerClusterName in a table with the peerTableId table id connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); - + // Write some data to table1 BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); for (int rows = 0; rows < 5000; rows++) { @@ -161,32 +204,23 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { } bw.addMutation(m); } - + bw.close(); - + log.info("Wrote all data to master cluster"); - -// log.debug(""); -// for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { -// if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { -// log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); -// } else { -// log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); -// } -// } - + final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); - + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { cluster.killProcess(ServerType.TABLET_SERVER, proc); } cluster.exec(TabletServer.class); - + log.info("TabletServer restarted"); for (@SuppressWarnings("unused") Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {} log.info("TabletServer is online"); - + log.info(""); log.info("Fetching metadata records:"); for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { @@ -196,33 +230,33 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); } } - + log.info(""); log.info("Fetching replication records:"); for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); } - + Future<Boolean> future = executor.submit(new Callable<Boolean>() { - + @Override public Boolean call() throws Exception { connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); log.info("Drain completed"); return true; } - + }); - + try { - future.get(30, TimeUnit.SECONDS); + future.get(60, TimeUnit.SECONDS); } catch (TimeoutException e) { future.cancel(true); - Assert.fail("Drain did not finish within 30 seconds"); + Assert.fail("Drain did not finish within 60 seconds"); } - + log.info("drain completed"); - + log.info(""); log.info("Fetching metadata records:"); for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { @@ -232,13 +266,13 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); } } - + log.info(""); log.info("Fetching replication records:"); for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) { log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); } - + Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); Entry<Key,Value> masterEntry = null, peerEntry = null; @@ -249,10 +283,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); } - + log.info("Last master entry: " + masterEntry); log.info("Last peer entry: " + peerEntry); - + Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); } finally { @@ -266,11 +300,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); peer1Cluster.start(); @@ -413,10 +446,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + MiniAccumuloClusterImpl peerCluster = peerCfg.build(); peerCluster.start(); @@ -512,10 +545,10 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { ROOT_PASSWORD); peerCfg.setNumTservers(1); peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); - peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); + + updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); + MiniAccumuloClusterImpl peer1Cluster = peerCfg.build(); peer1Cluster.start(); @@ -636,7 +669,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() .startsWith(masterTable1)); } - + log.info("Found {} records in {}", countTable, peerTable1); if (0l == countTable) { @@ -657,7 +690,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() .startsWith(masterTable2)); } - + log.info("Found {} records in {}", countTable, peerTable2); if (0l == countTable) {