http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java deleted file mode 100644 index 48dfdbd..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.cluster.ClusterUser; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.security.tokens.KerberosToken; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.harness.AccumuloITBase; -import org.apache.accumulo.harness.MiniClusterConfigurationCallback; -import org.apache.accumulo.harness.MiniClusterHarness; -import org.apache.accumulo.harness.TestingKdc; -import org.apache.accumulo.master.replication.SequentialWorkAssigner; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.minicluster.impl.ProcessReference; -import org.apache.accumulo.server.replication.ReplicaSystemFactory; -import org.apache.accumulo.test.functional.KerberosIT; -import org.apache.accumulo.tserver.TabletServer; -import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterators; - -/** - * Ensure that replication occurs using keytabs instead of password (not to mention SASL) - */ -public class KerberosReplicationIT extends AccumuloITBase { - private static final Logger log = LoggerFactory.getLogger(KerberosIT.class); - - private static TestingKdc kdc; - private static String krbEnabledForITs = null; - private static ClusterUser rootUser; - - @BeforeClass - public static void startKdc() throws Exception { - kdc = new TestingKdc(); - kdc.start(); - krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); - if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { - System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); - } - rootUser = kdc.getRootUser(); - } - - @AfterClass - public static void stopKdc() throws Exception { - if (null != kdc) { - kdc.stop(); - } - if (null != krbEnabledForITs) { - System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); - } - } - - private MiniAccumuloClusterImpl primary, peer; - private String PRIMARY_NAME = "primary", PEER_NAME = "peer"; - - @Override - protected int defaultTimeoutSeconds() { - return 60 * 3; - } - - private MiniClusterConfigurationCallback getConfigCallback(final String name) { - return new MiniClusterConfigurationCallback() { - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { - cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); - cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); - cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); - cfg.setProperty(Property.REPLICATION_NAME, name); - cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); - cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); - cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); - coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - }; - } - - @Before - public void setup() throws Exception { - MiniClusterHarness harness = new MiniClusterHarness(); - - // Create a primary and a peer instance, both with the same "root" user - primary = harness.create(getClass().getName(), testName.getMethodName(), new PasswordToken("unused"), getConfigCallback(PRIMARY_NAME), kdc); - primary.start(); - - peer = harness.create(getClass().getName(), testName.getMethodName() + "_peer", new PasswordToken("unused"), getConfigCallback(PEER_NAME), kdc); - peer.start(); - - // Enable kerberos auth - Configuration conf = new Configuration(false); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(conf); - } - - @After - public void teardown() throws Exception { - if (null != peer) { - peer.stop(); - } - if (null != primary) { - primary.stop(); - } - } - - @Test - public void dataReplicatedToCorrectTable() throws Exception { - // Login as the root user - UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); - - final KerberosToken token = new KerberosToken(); - final Connector primaryConn = primary.getConnector(rootUser.getPrincipal(), token); - final Connector peerConn = peer.getConnector(rootUser.getPrincipal(), token); - - ClusterUser replicationUser = kdc.getClientPrincipal(0); - - // Create user for replication to the peer - peerConn.securityOperations().createLocalUser(replicationUser.getPrincipal(), null); - - primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + PEER_NAME, replicationUser.getPrincipal()); - primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_KEYTAB.getKey() + PEER_NAME, replicationUser.getKeytab().getAbsolutePath()); - - // ...peer = AccumuloReplicaSystem,instanceName,zookeepers - primaryConn.instanceOperations().setProperty( - Property.REPLICATION_PEERS.getKey() + PEER_NAME, - ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, - AccumuloReplicaSystem.buildConfiguration(peerConn.getInstance().getInstanceName(), peerConn.getInstance().getZooKeepers()))); - - String primaryTable1 = "primary", peerTable1 = "peer"; - - // Create tables - primaryConn.tableOperations().create(primaryTable1); - String masterTableId1 = primaryConn.tableOperations().tableIdMap().get(primaryTable1); - Assert.assertNotNull(masterTableId1); - - peerConn.tableOperations().create(peerTable1); - String peerTableId1 = peerConn.tableOperations().tableIdMap().get(peerTable1); - Assert.assertNotNull(peerTableId1); - - // Grant write permission - peerConn.securityOperations().grantTablePermission(replicationUser.getPrincipal(), peerTable1, TablePermission.WRITE); - - // Replicate this table to the peerClusterName in a table with the peerTableId table id - primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION.getKey(), "true"); - primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION_TARGET.getKey() + PEER_NAME, peerTableId1); - - // Write some data to table1 - BatchWriter bw = primaryConn.createBatchWriter(primaryTable1, new BatchWriterConfig()); - long masterTable1Records = 0l; - for (int rows = 0; rows < 2500; rows++) { - Mutation m = new Mutation(primaryTable1 + rows); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - masterTable1Records++; - } - bw.addMutation(m); - } - - bw.close(); - - log.info("Wrote all data to primary cluster"); - - Set<String> filesFor1 = primaryConn.replicationOperations().referencedFiles(primaryTable1); - - // Restart the tserver to force a close on the WAL - for (ProcessReference proc : primary.getProcesses().get(ServerType.TABLET_SERVER)) { - primary.killProcess(ServerType.TABLET_SERVER, proc); - } - primary.exec(TabletServer.class); - - log.info("Restarted the tserver"); - - // Read the data -- the tserver is back up and running and tablets are assigned - Iterators.size(primaryConn.createScanner(primaryTable1, Authorizations.EMPTY).iterator()); - - // Wait for both tables to be replicated - log.info("Waiting for {} for {}", filesFor1, primaryTable1); - primaryConn.replicationOperations().drain(primaryTable1, filesFor1); - - long countTable = 0l; - for (Entry<Key,Value> entry : peerConn.createScanner(peerTable1, Authorizations.EMPTY)) { - countTable++; - Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() - .startsWith(primaryTable1)); - } - - log.info("Found {} records in {}", countTable, peerTable1); - Assert.assertEquals(masterTable1Records, countTable); - } -}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java deleted file mode 100644 index b6888db..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ /dev/null @@ -1,731 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.master.replication.SequentialWorkAssigner; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.minicluster.impl.ProcessReference; -import org.apache.accumulo.server.replication.ReplicaSystemFactory; -import org.apache.accumulo.server.replication.StatusUtil; -import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.accumulo.tserver.TabletServer; -import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterators; - -/** - * Replication tests which start at least two MAC instances and replicate data between them - */ -public class MultiInstanceReplicationIT extends ConfigurableMacBase { - private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class); - - private ExecutorService executor; - - @Override - public int defaultTimeoutSeconds() { - return 10 * 60; - } - - @Before - public void createExecutor() { - executor = Executors.newSingleThreadExecutor(); - } - - @After - public void stopExecutor() { - if (null != executor) { - executor.shutdownNow(); - } - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M"); - cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "5s"); - cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); - cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M"); - cfg.setProperty(Property.REPLICATION_NAME, "master"); - cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName()); - cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - /** - * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication - */ - private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { - // Set the same SSL information from the primary when present - Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); - if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { - Map<String,String> peerSiteConfig = new HashMap<String,String>(); - peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); - String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); - Assert.assertNotNull("Keystore Path was null", keystorePath); - peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); - String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); - Assert.assertNotNull("Truststore Path was null", truststorePath); - peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); - - // Passwords might be stored in CredentialProvider - String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); - if (null != keystorePassword) { - peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); - } - String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); - if (null != truststorePassword) { - peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); - } - - System.out.println("Setting site configuration for peer " + peerSiteConfig); - peerCfg.setSiteConfig(peerSiteConfig); - } - - // Use the CredentialProvider if the primary also uses one - String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); - if (null != credProvider) { - Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); - peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); - peerCfg.setSiteConfig(peerSiteConfig); - } - } - - @Test(timeout = 10 * 60 * 1000) - public void dataWasReplicatedToThePeer() throws Exception { - MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), - ROOT_PASSWORD); - peerCfg.setNumTservers(1); - peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - - updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); - - MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); - - peerCluster.start(); - - try { - final Connector connMaster = getConnector(); - final Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); - - ReplicationTable.setOnline(connMaster); - - String peerUserName = "peer", peerPassword = "foo"; - - String peerClusterName = "peer"; - - connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - - // ...peer = AccumuloReplicaSystem,instanceName,zookeepers - connMaster.instanceOperations().setProperty( - Property.REPLICATION_PEERS.getKey() + peerClusterName, - ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, - AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); - - final String masterTable = "master", peerTable = "peer"; - - connMaster.tableOperations().create(masterTable); - String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); - Assert.assertNotNull(masterTableId); - - connPeer.tableOperations().create(peerTable); - String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); - Assert.assertNotNull(peerTableId); - - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); - - // Replicate this table to the peerClusterName in a table with the peerTableId table id - connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); - - // Write some data to table1 - BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); - for (int rows = 0; rows < 5000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - log.info("Wrote all data to master cluster"); - - final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable); - - log.info("Files to replicate: " + filesNeedingReplication); - - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } - cluster.exec(TabletServer.class); - - log.info("TabletServer restarted"); - Iterators.size(ReplicationTable.getScanner(connMaster).iterator()); - log.info("TabletServer is online"); - - while (!ReplicationTable.isOnline(connMaster)) { - log.info("Replication table still offline, waiting"); - Thread.sleep(5000); - } - - log.info(""); - log.info("Fetching metadata records:"); - for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { - if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { - log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); - } else { - log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); - } - } - - log.info(""); - log.info("Fetching replication records:"); - for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { - log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); - } - - Future<Boolean> future = executor.submit(new Callable<Boolean>() { - - @Override - public Boolean call() throws Exception { - connMaster.replicationOperations().drain(masterTable, filesNeedingReplication); - log.info("Drain completed"); - return true; - } - - }); - - try { - future.get(60, TimeUnit.SECONDS); - } catch (TimeoutException e) { - future.cancel(true); - Assert.fail("Drain did not finish within 60 seconds"); - } finally { - executor.shutdownNow(); - } - - log.info("drain completed"); - - log.info(""); - log.info("Fetching metadata records:"); - for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) { - if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) { - log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); - } else { - log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue()); - } - } - - log.info(""); - log.info("Fetching replication records:"); - for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { - log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); - } - - Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); - Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); - Entry<Key,Value> masterEntry = null, peerEntry = null; - while (masterIter.hasNext() && peerIter.hasNext()) { - masterEntry = masterIter.next(); - peerEntry = peerIter.next(); - Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, - masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); - Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); - } - - log.info("Last master entry: " + masterEntry); - log.info("Last peer entry: " + peerEntry); - - Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); - Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); - } finally { - peerCluster.stop(); - } - } - - @Test - public void dataReplicatedToCorrectTable() throws Exception { - MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), - ROOT_PASSWORD); - peerCfg.setNumTservers(1); - peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - - updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); - - MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); - - peer1Cluster.start(); - - try { - Connector connMaster = getConnector(); - Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); - - String peerClusterName = "peer"; - String peerUserName = "peer", peerPassword = "foo"; - - // Create local user - connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - - // ...peer = AccumuloReplicaSystem,instanceName,zookeepers - connMaster.instanceOperations().setProperty( - Property.REPLICATION_PEERS.getKey() + peerClusterName, - ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, - AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); - - String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; - - // Create tables - connMaster.tableOperations().create(masterTable1); - String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); - Assert.assertNotNull(masterTableId1); - - connMaster.tableOperations().create(masterTable2); - String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); - Assert.assertNotNull(masterTableId2); - - connPeer.tableOperations().create(peerTable1); - String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); - Assert.assertNotNull(peerTableId1); - - connPeer.tableOperations().create(peerTable2); - String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); - Assert.assertNotNull(peerTableId2); - - // Grant write permission - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); - - // Replicate this table to the peerClusterName in a table with the peerTableId table id - connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); - - connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); - - // Write some data to table1 - BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); - long masterTable1Records = 0l; - for (int rows = 0; rows < 2500; rows++) { - Mutation m = new Mutation(masterTable1 + rows); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - masterTable1Records++; - } - bw.addMutation(m); - } - - bw.close(); - - // Write some data to table2 - bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); - long masterTable2Records = 0l; - for (int rows = 0; rows < 2500; rows++) { - Mutation m = new Mutation(masterTable2 + rows); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - masterTable2Records++; - } - bw.addMutation(m); - } - - bw.close(); - - log.info("Wrote all data to master cluster"); - - Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles( - masterTable2); - - log.info("Files to replicate for table1: " + filesFor1); - log.info("Files to replicate for table2: " + filesFor2); - - // Restart the tserver to force a close on the WAL - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } - cluster.exec(TabletServer.class); - - log.info("Restarted the tserver"); - - // Read the data -- the tserver is back up and running - Iterators.size(connMaster.createScanner(masterTable1, Authorizations.EMPTY).iterator()); - - while (!ReplicationTable.isOnline(connMaster)) { - log.info("Replication table still offline, waiting"); - Thread.sleep(5000); - } - - // Wait for both tables to be replicated - log.info("Waiting for {} for {}", filesFor1, masterTable1); - connMaster.replicationOperations().drain(masterTable1, filesFor1); - - log.info("Waiting for {} for {}", filesFor2, masterTable2); - connMaster.replicationOperations().drain(masterTable2, filesFor2); - - long countTable = 0l; - for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { - countTable++; - Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() - .startsWith(masterTable1)); - } - - log.info("Found {} records in {}", countTable, peerTable1); - Assert.assertEquals(masterTable1Records, countTable); - - countTable = 0l; - for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { - countTable++; - Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() - .startsWith(masterTable2)); - } - - log.info("Found {} records in {}", countTable, peerTable2); - Assert.assertEquals(masterTable2Records, countTable); - - } finally { - peer1Cluster.stop(); - } - } - - @Test - public void dataWasReplicatedToThePeerWithoutDrain() throws Exception { - MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), - ROOT_PASSWORD); - peerCfg.setNumTservers(1); - peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - - updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); - - MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg); - - peerCluster.start(); - - Connector connMaster = getConnector(); - Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); - - String peerUserName = "repl"; - String peerPassword = "passwd"; - - // Create a user on the peer for replication to use - connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - - String peerClusterName = "peer"; - - // ...peer = AccumuloReplicaSystem,instanceName,zookeepers - connMaster.instanceOperations().setProperty( - Property.REPLICATION_PEERS.getKey() + peerClusterName, - ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, - AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers()))); - - // Configure the credentials we should use to authenticate ourselves to the peer for replication - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - - String masterTable = "master", peerTable = "peer"; - - connMaster.tableOperations().create(masterTable); - String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable); - Assert.assertNotNull(masterTableId); - - connPeer.tableOperations().create(peerTable); - String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable); - Assert.assertNotNull(peerTableId); - - // Give our replication user the ability to write to the table - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE); - - // Replicate this table to the peerClusterName in a table with the peerTableId table id - connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId); - - // Write some data to table1 - BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig()); - for (int rows = 0; rows < 5000; rows++) { - Mutation m = new Mutation(Integer.toString(rows)); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - log.info("Wrote all data to master cluster"); - - Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable); - - log.info("Files to replicate:" + files); - - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } - - cluster.exec(TabletServer.class); - - while (!ReplicationTable.isOnline(connMaster)) { - log.info("Replication table still offline, waiting"); - Thread.sleep(5000); - } - - Iterators.size(connMaster.createScanner(masterTable, Authorizations.EMPTY).iterator()); - - for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) { - log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get()))); - } - - connMaster.replicationOperations().drain(masterTable, files); - - Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY); - Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator(); - while (masterIter.hasNext() && peerIter.hasNext()) { - Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next(); - Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0, - masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)); - Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue()); - } - - Assert.assertFalse("Had more data to read from the master", masterIter.hasNext()); - Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext()); - - peerCluster.stop(); - } - - @Test - public void dataReplicatedToCorrectTableWithoutDrain() throws Exception { - MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), - ROOT_PASSWORD); - peerCfg.setNumTservers(1); - peerCfg.setInstanceName("peer"); - peerCfg.setProperty(Property.REPLICATION_NAME, "peer"); - - updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg); - - MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg); - - peer1Cluster.start(); - - try { - Connector connMaster = getConnector(); - Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)); - - String peerClusterName = "peer"; - - String peerUserName = "repl"; - String peerPassword = "passwd"; - - // Create a user on the peer for replication to use - connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword)); - - // Configure the credentials we should use to authenticate ourselves to the peer for replication - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName); - connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword); - - // ...peer = AccumuloReplicaSystem,instanceName,zookeepers - connMaster.instanceOperations().setProperty( - Property.REPLICATION_PEERS.getKey() + peerClusterName, - ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, - AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers()))); - - String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2"; - - connMaster.tableOperations().create(masterTable1); - String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1); - Assert.assertNotNull(masterTableId1); - - connMaster.tableOperations().create(masterTable2); - String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2); - Assert.assertNotNull(masterTableId2); - - connPeer.tableOperations().create(peerTable1); - String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1); - Assert.assertNotNull(peerTableId1); - - connPeer.tableOperations().create(peerTable2); - String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2); - Assert.assertNotNull(peerTableId2); - - // Give our replication user the ability to write to the tables - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE); - connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE); - - // Replicate this table to the peerClusterName in a table with the peerTableId table id - connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1); - - connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true"); - connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2); - - // Write some data to table1 - BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig()); - for (int rows = 0; rows < 2500; rows++) { - Mutation m = new Mutation(masterTable1 + rows); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - // Write some data to table2 - bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig()); - for (int rows = 0; rows < 2500; rows++) { - Mutation m = new Mutation(masterTable2 + rows); - for (int cols = 0; cols < 100; cols++) { - String value = Integer.toString(cols); - m.put(value, "", value); - } - bw.addMutation(m); - } - - bw.close(); - - log.info("Wrote all data to master cluster"); - - for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { - cluster.killProcess(ServerType.TABLET_SERVER, proc); - } - - cluster.exec(TabletServer.class); - - while (!ReplicationTable.isOnline(connMaster)) { - log.info("Replication table still offline, waiting"); - Thread.sleep(5000); - } - - // Wait until we fully replicated something - boolean fullyReplicated = false; - for (int i = 0; i < 10 && !fullyReplicated; i++) { - UtilWaitThread.sleep(2000); - - Scanner s = ReplicationTable.getScanner(connMaster); - WorkSection.limit(s); - for (Entry<Key,Value> entry : s) { - Status status = Status.parseFrom(entry.getValue().get()); - if (StatusUtil.isFullyReplicated(status)) { - fullyReplicated |= true; - } - } - } - - Assert.assertNotEquals(0, fullyReplicated); - - // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it - // Be cautious in how quickly we assert that the data is present on the peer - long countTable = 0l; - for (int i = 0; i < 10; i++) { - for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) { - countTable++; - Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() - .startsWith(masterTable1)); - } - - log.info("Found {} records in {}", countTable, peerTable1); - - if (0l == countTable) { - Thread.sleep(5000); - } else { - break; - } - } - - Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0); - - // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it - // Be cautious in how quickly we assert that the data is present on the peer - for (int i = 0; i < 10; i++) { - countTable = 0l; - for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) { - countTable++; - Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString() - .startsWith(masterTable2)); - } - - log.info("Found {} records in {}", countTable, peerTable2); - - if (0l == countTable) { - Thread.sleep(5000); - } else { - break; - } - } - - Assert.assertTrue("Found no records in " + peerTable2 + " in the peer cluster", countTable > 0); - - } finally { - peer1Cluster.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java deleted file mode 100644 index 72cb569..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.replication.ReplicationConstants; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooReader; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Iterables; -import com.google.common.net.HostAndPort; - -/** - * - */ -public class MultiTserverReplicationIT extends ConfigurableMacBase { - private static final Logger log = LoggerFactory.getLogger(MultiTserverReplicationIT.class); - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(2); - } - - @Test - public void tserverReplicationServicePortsAreAdvertised() throws Exception { - // Wait for the cluster to be up - Connector conn = getConnector(); - Instance inst = conn.getInstance(); - - // Wait for a tserver to come up to fulfill this request - conn.tableOperations().create("foo"); - Scanner s = conn.createScanner("foo", Authorizations.EMPTY); - Assert.assertEquals(0, Iterables.size(s)); - - ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut()); - Set<String> tserverHost = new HashSet<>(); - tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS)); - - Set<HostAndPort> replicationServices = new HashSet<>(); - - for (String tserver : tserverHost) { - try { - byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + ReplicationConstants.ZOO_TSERVERS + "/" + tserver, null); - HostAndPort replAddress = HostAndPort.fromString(new String(portData, UTF_8)); - replicationServices.add(replAddress); - } catch (Exception e) { - log.error("Could not find port for {}", tserver, e); - Assert.fail("Did not find replication port advertisement for " + tserver); - } - } - - // Each tserver should also have equial replicaiton services running internally - Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size()); - } - - @Test - public void masterReplicationServicePortsAreAdvertised() throws Exception { - // Wait for the cluster to be up - Connector conn = getConnector(); - Instance inst = conn.getInstance(); - - // Wait for a tserver to come up to fulfill this request - conn.tableOperations().create("foo"); - Scanner s = conn.createScanner("foo", Authorizations.EMPTY); - Assert.assertEquals(0, Iterables.size(s)); - - ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut()); - - // Should have one master instance - Assert.assertEquals(1, inst.getMasterLocations().size()); - - // Get the master thrift service addr - String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations()); - - // Get the master replication coordinator addr - String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), UTF_8); - - // They shouldn't be the same - Assert.assertNotEquals(masterAddr, replCoordAddr); - - // Neither should be zero as the port - Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort()); - Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort()); - } -}