Merge branch '1.6' Conflicts: test/src/test/java/org/apache/accumulo/test/functional/AbstractMacIT.java test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e16d55f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e16d55f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e16d55f Branch: refs/heads/master Commit: 6e16d55f1e13177a2c59ef73a076e8f41d8a695b Parents: 524a813 0532b62 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed Dec 10 18:52:24 2014 -0500 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Wed Dec 10 18:52:24 2014 -0500 ---------------------------------------------------------------------- .../accumulo/harness/AccumuloClusterIT.java | 2 +- .../accumulo/harness/SharedMiniClusterIT.java | 4 +- .../test/ArbitraryTablePropertiesIT.java | 5 +- .../apache/accumulo/test/ImportExportIT.java | 5 +- .../accumulo/test/SplitCancelsMajCIT.java | 5 +- .../accumulo/test/functional/AbstractMacIT.java | 93 ------------ .../test/functional/ConfigurableMacIT.java | 79 ++++++++--- .../functional/DeletedTablesDontFlushIT.java | 4 +- .../test/functional/HalfDeadTServerIT.java | 3 +- .../accumulo/test/functional/SimpleMacIT.java | 142 +------------------ .../test/replication/CyclicReplicationIT.java | 4 +- ...bageCollectorCommunicatesWithTServersIT.java | 5 +- .../test/replication/StatusCombinerMacIT.java | 5 +- 13 files changed, 84 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java index 7cc136f,0000000..aa5c164 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java @@@ -1,198 -1,0 +1,197 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.TablePermission; - import org.apache.accumulo.test.functional.SimpleMacIT; ++import org.apache.accumulo.harness.SharedMiniClusterIT; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + - @SuppressWarnings("deprecation") - public class ArbitraryTablePropertiesIT extends SimpleMacIT { ++public class ArbitraryTablePropertiesIT extends SharedMiniClusterIT { + private static final Logger log = Logger.getLogger(ArbitraryTablePropertiesIT.class); + + @Override + protected int defaultTimeoutSeconds() { + return 30; + }; + + // Test set, get, and remove arbitrary table properties on the root account + @Test + public void setGetRemoveTablePropertyRoot() throws Exception { + log.debug("Starting setGetRemoveTablePropertyRoot test ------------------------"); + + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + // Set the property to the desired value + conn.tableOperations().setProperty(tableName, propertyName, description1); + + // Loop through properties to make sure the new property is added to the list + int count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description1)) + count++; + } + Assert.assertEquals(count, 1); + + // Set the property as something different + String description2 = "set second"; + conn.tableOperations().setProperty(tableName, propertyName, description2); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description2)) + count++; + } + Assert.assertEquals(count, 1); + + // Remove the property and make sure there is no longer a value associated with it + conn.tableOperations().removeProperty(tableName, propertyName); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + } + + // Tests set, get, and remove of user added arbitrary properties using a non-root account with permissions to alter tables + @Test + public void userSetGetRemoveTablePropertyWithPermission() throws Exception { + log.debug("Starting userSetGetRemoveTablePropertyWithPermission test ------------------------"); + + // Make a test username and password + String testUser = makeUserName(); + PasswordToken testPasswd = new PasswordToken("test_password"); + + // Create a root user and create the table + // Create a test user and grant that user permission to alter the table + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.securityOperations().createLocalUser(testUser, testPasswd); + Connector conn = c.getInstance().getConnector(testUser, testPasswd); + c.tableOperations().create(tableName); + c.securityOperations().grantTablePermission(testUser, tableName, TablePermission.ALTER_TABLE); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + // Set the property to the desired value + conn.tableOperations().setProperty(tableName, propertyName, description1); + + // Loop through properties to make sure the new property is added to the list + int count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description1)) + count++; + } + Assert.assertEquals(count, 1); + + // Set the property as something different + String description2 = "set second"; + conn.tableOperations().setProperty(tableName, propertyName, description2); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description2)) + count++; + } + Assert.assertEquals(count, 1); + + // Remove the property and make sure there is no longer a value associated with it + conn.tableOperations().removeProperty(tableName, propertyName); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + + } + + // Tests set and get of user added arbitrary properties using a non-root account without permissions to alter tables + @Test + public void userSetGetTablePropertyWithoutPermission() throws Exception { + log.debug("Starting userSetGetTablePropertyWithoutPermission test ------------------------"); + + // Make a test username and password + String testUser = makeUserName(); + PasswordToken testPasswd = new PasswordToken("test_password"); + + // Create a root user and create the table + // Create a test user and grant that user permission to alter the table + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.securityOperations().createLocalUser(testUser, testPasswd); + Connector conn = c.getInstance().getConnector(testUser, testPasswd); + c.tableOperations().create(tableName); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + + // Try to set the property to the desired value. + // If able to set it, the test fails, since permission was never granted + try { + conn.tableOperations().setProperty(tableName, propertyName, description1); + Assert.fail("Was able to set property without permissions"); + } catch (AccumuloSecurityException e) {} + + // Loop through properties to make sure the new property is not added to the list + int count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + } + + static AtomicInteger userId = new AtomicInteger(0); + + static String makeUserName() { + return "user_" + userId.getAndIncrement(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java index 4d8855d,0000000..53354b1 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java +++ b/test/src/test/java/org/apache/accumulo/test/SplitCancelsMajCIT.java @@@ -1,85 -1,0 +1,84 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertTrue; + +import java.util.EnumSet; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.util.UtilWaitThread; - import org.apache.accumulo.test.functional.SimpleMacIT; ++import org.apache.accumulo.harness.SharedMiniClusterIT; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +// ACCUMULO-2862 - @SuppressWarnings("deprecation") - public class SplitCancelsMajCIT extends SimpleMacIT{ ++public class SplitCancelsMajCIT extends SharedMiniClusterIT { + + @Test(timeout = 2 * 60 * 1000) + public void test() throws Exception { + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.tableOperations().create(tableName); + // majc should take 100 * .5 secs + IteratorSetting it = new IteratorSetting(100, SlowIterator.class); + SlowIterator.setSleepTime(it, 500); + c.tableOperations().attachIterator(tableName, it, EnumSet.of(IteratorScope.majc)); + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + Mutation m = new Mutation("" + i); + m.put("", "", new Value()); + bw.addMutation(m); + } + bw.flush(); + // start majc + final AtomicReference<Exception> ex = new AtomicReference<Exception>(); + Thread thread = new Thread() { + @Override + public void run() { + try { + c.tableOperations().compact(tableName, null, null, true, true); + } catch (Exception e) { + ex.set(e); + } + } + }; + thread.start(); + + long now = System.currentTimeMillis(); + UtilWaitThread.sleep(10 * 1000); + // split the table, interrupts the compaction + SortedSet<Text> partitionKeys = new TreeSet<Text>(); + partitionKeys.add(new Text("10")); + c.tableOperations().addSplits(tableName, partitionKeys); + thread.join(); + // wait for the restarted compaction + assertTrue(System.currentTimeMillis() - now > 59 * 1000); + if (ex.get() != null) + throw ex.get(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java index e2f08cd,1c8b1f0..a921538 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConfigurableMacIT.java @@@ -34,21 -37,65 +38,66 @@@ import org.apache.accumulo.minicluster. import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; + import org.apache.accumulo.test.util.CertUtils; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.Before; - public class ConfigurableMacIT extends AbstractMacIT { - protected static final Logger log = Logger.getLogger(ConfigurableMacIT.class); + /** + * General Integration-Test base class that provides access to a {@link MiniAccumuloCluster} for testing. Tests using these typically do very disruptive things + * to the instance, and require specific configuration. Most tests don't need this level of control and should extend {@link AccumuloClusterIT} instead. + */ + public class ConfigurableMacIT extends AccumuloIT { + public static final Logger log = Logger.getLogger(ConfigurableMacIT.class); + + protected MiniAccumuloClusterImpl cluster; + + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {} + + protected void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {} - public MiniAccumuloClusterImpl cluster; + protected static final String ROOT_PASSWORD = "testRootPassword1"; - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {} - private static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) { ++ public static void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> testClass, File folder) { + if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useSslForIT"))) { + configureForSsl(cfg, folder); + } + if ("true".equals(System.getProperty("org.apache.accumulo.test.functional.useCredProviderForIT"))) { + cfg.setUseCredentialProvider(true); + } + } - public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception {} + protected static void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) { + Map<String,String> siteConfig = cfg.getSiteConfig(); + if ("true".equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + // already enabled; don't mess with it + return; + } + + File sslDir = new File(folder, "ssl"); + sslDir.mkdirs(); + File rootKeystoreFile = new File(sslDir, "root-" + cfg.getInstanceName() + ".jks"); + File localKeystoreFile = new File(sslDir, "local-" + cfg.getInstanceName() + ".jks"); + File publicTruststoreFile = new File(sslDir, "public-" + cfg.getInstanceName() + ".jks"); + final String rootKeystorePassword = "root_keystore_password", truststorePassword = "truststore_password"; + try { + new CertUtils(Property.RPC_SSL_KEYSTORE_TYPE.getDefaultValue(), "o=Apache Accumulo,cn=MiniAccumuloCluster", "RSA", 2048, "sha1WithRSAEncryption") + .createAll(rootKeystoreFile, localKeystoreFile, publicTruststoreFile, cfg.getInstanceName(), rootKeystorePassword, cfg.getRootPassword(), + truststorePassword); + } catch (Exception e) { + throw new RuntimeException("error creating MAC keystore", e); + } + + siteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + siteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), localKeystoreFile.getAbsolutePath()); + siteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), cfg.getRootPassword()); + siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), publicTruststoreFile.getAbsolutePath()); + siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + cfg.setSiteConfig(siteConfig); + } @Before public void setUp() throws Exception { @@@ -118,9 -162,4 +164,8 @@@ return MonitorUtil.getLocation(instance); } - @Override + protected ClientConfiguration getClientConfig() throws Exception { + return new ClientConfiguration(new PropertiesConfiguration(getCluster().getConfig().getClientConfFile())); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java index d342bc2,0000000..97c696e mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/DeletedTablesDontFlushIT.java @@@ -1,57 -1,0 +1,57 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.EnumSet; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.fate.util.UtilWaitThread; ++import org.apache.accumulo.harness.SharedMiniClusterIT; +import org.junit.Test; + +// ACCUMULO-2880 - @SuppressWarnings("deprecation") - public class DeletedTablesDontFlushIT extends SimpleMacIT { ++public class DeletedTablesDontFlushIT extends SharedMiniClusterIT { + + @Test(timeout = 60 * 1000) + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + IteratorSetting setting = new IteratorSetting(100, SlowIterator.class); + SlowIterator.setSleepTime(setting, 1000); + c.tableOperations().attachIterator(tableName, setting, EnumSet.of(IteratorScope.minc)); + // let the configuration change propagate through zookeeper + UtilWaitThread.sleep(1000); + + Mutation m = new Mutation("xyzzy"); + for (int i = 0; i < 100; i++) { + m.put("cf", "" + i, new Value(new byte[]{})); + } + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(m); + bw.close(); + // should go fast + c.tableOperations().delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java index a85f9fb,0000000..8520f66 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java @@@ -1,331 -1,0 +1,331 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.LongCombiner.Type; +import org.apache.accumulo.core.iterators.user.SummingCombiner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; - import org.apache.accumulo.test.functional.AbstractMacIT; ++import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * + */ +public class CyclicReplicationIT { + private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class); + + @Rule + public Timeout getTimeout() { + int scalingFactor = 1; + try { + scalingFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException exception) { + log.warn("Could not parse timeout.factor, not scaling timeout"); + } + + return new Timeout(scalingFactor * 5 * 60 * 1000); + } + + @Rule + public TestName testName = new TestName(); + + private File createTestDir(String name) { + File baseDir = new File(System.getProperty("user.dir") + "/target/mini-tests"); + baseDir.mkdirs(); + File testDir = new File(baseDir, this.getClass().getName() + "_" + testName.getMethodName() + "_" + name); + FileUtils.deleteQuietly(testDir); + testDir.mkdir(); + return testDir; + } + + private void setCoreSite(MiniAccumuloClusterImpl cluster) throws Exception { + File csFile = new File(cluster.getConfig().getConfDir(), "core-site.xml"); + if (csFile.exists()) + throw new RuntimeException(csFile + " already exist"); + + Configuration coreSite = new Configuration(false); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "core-site.xml"))); + coreSite.writeXml(out); + out.close(); + } + + /** + * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication + */ + private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) { + // Set the same SSL information from the primary when present + Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig(); + if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) { + Map<String,String> peerSiteConfig = new HashMap<String,String>(); + peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true"); + String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey()); + Assert.assertNotNull("Keystore Path was null", keystorePath); + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath); + String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey()); + Assert.assertNotNull("Truststore Path was null", truststorePath); + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath); + + // Passwords might be stored in CredentialProvider + String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey()); + if (null != keystorePassword) { + peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword); + } + String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey()); + if (null != truststorePassword) { + peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword); + } + + System.out.println("Setting site configuration for peer " + peerSiteConfig); + peerCfg.setSiteConfig(peerSiteConfig); + } + + // Use the CredentialProvider if the primary also uses one + String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey()); + if (null != credProvider) { + Map<String,String> peerSiteConfig = peerCfg.getSiteConfig(); + peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider); + peerCfg.setSiteConfig(peerSiteConfig); + } + } + + @Test + public void dataIsNotOverReplicated() throws Exception { + File master1Dir = createTestDir("master1"), master2Dir = createTestDir("master2"); + String password = "password"; + + MiniAccumuloConfigImpl master1Cfg; + MiniAccumuloClusterImpl master1Cluster; + while (true) { + master1Cfg= new MiniAccumuloConfigImpl(master1Dir, password); + master1Cfg.setNumTservers(1); + master1Cfg.setInstanceName("master1"); + + // Set up SSL if needed - AbstractMacIT.configureForEnvironment(master1Cfg, this.getClass(), AbstractMacIT.createSharedTestDir(this.getClass().getName() + "-ssl")); ++ ConfigurableMacIT.configureForEnvironment(master1Cfg, this.getClass(), ConfigurableMacIT.createSharedTestDir(this.getClass().getName() + "-ssl")); + + master1Cfg.setProperty(Property.REPLICATION_NAME, master1Cfg.getInstanceName()); + master1Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master1Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master1Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master1Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master1Cluster = new MiniAccumuloClusterImpl(master1Cfg); + setCoreSite(master1Cluster); + + try { + master1Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master1Cfg.getZooKeeperPort() + ", will retry"); + } + } + + MiniAccumuloConfigImpl master2Cfg; + MiniAccumuloClusterImpl master2Cluster; + while (true) { + master2Cfg = new MiniAccumuloConfigImpl(master2Dir, password); + master2Cfg.setNumTservers(1); + master2Cfg.setInstanceName("master2"); + + // Set up SSL if needed. Need to share the same SSL truststore as master1 + this.updatePeerConfigFromPrimary(master1Cfg, master2Cfg); + + master2Cfg.setProperty(Property.REPLICATION_NAME, master2Cfg.getInstanceName()); + master2Cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M"); + master2Cfg.setProperty(Property.REPLICATION_THREADCHECK, "5m"); + master2Cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); + master2Cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); + master2Cluster = new MiniAccumuloClusterImpl(master2Cfg); + setCoreSite(master2Cluster); + + try { + master2Cluster.start(); + break; + } catch (ZooKeeperBindException e) { + log.warn("Failed to start ZooKeeper on " + master2Cfg.getZooKeeperPort() + ", will retry"); + } + } + + try { + Connector connMaster1 = master1Cluster.getConnector("root", new PasswordToken(password)), connMaster2 = master2Cluster.getConnector("root", + new PasswordToken(password)); + + String master1UserName = "master1", master1Password = "foo"; + String master2UserName = "master2", master2Password = "bar"; + String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName(); + + connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password)); + connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password)); + + // Configure the credentials we should use to authenticate ourselves to the peer for replication + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName); + connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password); + + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName); + connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password); + + connMaster1.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master2Cluster.getInstanceName(), master2Cluster.getZooKeepers()))); + + connMaster2.instanceOperations().setProperty( + Property.REPLICATION_PEERS.getKey() + master1Cluster.getInstanceName(), + ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, + AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers()))); + + connMaster1.tableOperations().create(master1Table, false); + String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table); + Assert.assertNotNull(master1TableId); + + connMaster2.tableOperations().create(master2Table, false); + String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table); + Assert.assertNotNull(master2TableId); + + // Replicate master1 in the master1 cluster to master2 in the master2 cluster + connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster1.tableOperations().setProperty(master1Table, + Property.TABLE_REPLICATION_TARGET.getKey() + master2Cluster.getInstanceName(), master2TableId); + + // Replicate master2 in the master2 cluster to master1 in the master2 cluster + connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true"); + connMaster2.tableOperations().setProperty(master2Table, + Property.TABLE_REPLICATION_TARGET.getKey() + master1Cluster.getInstanceName(), master1TableId); + + // Give our replication user the ability to write to the respective table + connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE); + connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE); + + IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class); + SummingCombiner.setEncodingType(summingCombiner, Type.STRING); + SummingCombiner.setCombineAllColumns(summingCombiner, true); + + // Set a combiner on both instances that will sum multiple values + // We can use this to verify that the mutation was not sent multiple times + connMaster1.tableOperations().attachIterator(master1Table, summingCombiner); + connMaster2.tableOperations().attachIterator(master2Table, summingCombiner); + + // Write a single entry + BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("count", "", "1"); + bw.addMutation(m); + bw.close(); + + Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table); + + log.info("Found {} that need replication from master1", files); + + // Kill and restart the tserver to close the WAL on master1 + for (ProcessReference proc : master1Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master1Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master1Cluster.exec(TabletServer.class); + + log.info("Restarted tserver on master1"); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Sanity check that the element is there on master1 + Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for this table to replicate + connMaster1.replicationOperations().drain(master1Table, files); + + Thread.sleep(5000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + // Wait for master2 to finish replicating it back + files = connMaster2.replicationOperations().referencedFiles(master2Table); + + // Kill and restart the tserver to close the WAL on master2 + for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + master2Cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + + master2Cluster.exec(TabletServer.class); + + // Try to avoid ACCUMULO-2964 + Thread.sleep(1000); + + // Check that the element made it to master2 only once + s = connMaster2.createScanner(master2Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + + connMaster2.replicationOperations().drain(master2Table, files); + + Thread.sleep(5000); + + // Verify that the entry wasn't sent back to master1 + s = connMaster1.createScanner(master1Table, Authorizations.EMPTY); + entry = Iterables.getOnlyElement(s); + Assert.assertEquals("1", entry.getValue().toString()); + } finally { + master1Cluster.stop(); + master2Cluster.stop(); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 7f42aa0,0000000..544fb36 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@@ -1,443 -1,0 +1,442 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; - import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.ClientContext; ++import org.apache.accumulo.core.client.impl.ClientExecReturn; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; - import org.apache.accumulo.test.functional.AbstractMacIT; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.bouncycastle.util.Arrays; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ACCUMULO-3302 series of tests which ensure that a WAL is prematurely closed when a TServer may still continue to use it. Checking that no tablet references a + * WAL is insufficient to determine if a WAL will never be used in the future. + */ +public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacIT { + private static final Logger log = LoggerFactory.getLogger(GarbageCollectorCommunicatesWithTServersIT.class); + + private final int GC_PERIOD_SECONDS = 1; + + @Override + public int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); + // Wait longer to try to let the replication table come online before a cycle runs + cfg.setProperty(Property.GC_CYCLE_START, "10s"); + cfg.setProperty(Property.REPLICATION_NAME, "master"); + // Set really long delays for the master to do stuff for replication. We don't need + // it to be doing anything, so just let it sleep + cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "240s"); + cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "240s"); + cfg.setProperty(Property.REPLICATION_DRIVER_DELAY, "240s"); + // Pull down the maximum size of the wal so we can test close()'ing it. + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); + coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + /** + * Fetch all of the WALs referenced by tablets in the metadata table for this table + */ + private Set<String> getWalsForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + + Set<String> wals = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // hostname:port/uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + int index = cq.indexOf('/'); + // Normalize the path + String path = new Path(cq.substring(index + 1)).toString(); + log.debug("Extracted file: " + path); + wals.add(path); + } + + return wals; + } + + /** + * Fetch all of the rfiles referenced by tablets in the metadata table for this table + */ + private Set<String> getFilesForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.TabletsSection.getRange(tableId); + s.setRange(r); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + + Set<String> rfiles = new HashSet<String>(); + for (Entry<Key,Value> entry : s) { + log.debug("Reading RFiles: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); + // uri://path/to/wal + String cq = entry.getKey().getColumnQualifier().toString(); + String path = new Path(cq).toString(); + log.debug("Normalize path to rfile: {}", path); + rfiles.add(path); + } + + return rfiles; + } + + /** + * Get the replication status messages for the given table that exist in the metadata table (~repl entries) + */ + private Map<String,Status> getMetadataStatusForTable(String tableName) throws Exception { + final Connector conn = getConnector(); + final String tableId = conn.tableOperations().tableIdMap().get(tableName); + + Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); + + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Range r = MetadataSchema.ReplicationSection.getRange(); + s.setRange(r); + s.fetchColumn(MetadataSchema.ReplicationSection.COLF, new Text(tableId)); + + Map<String,Status> fileToStatus = new HashMap<String,Status>(); + for (Entry<Key,Value> entry : s) { + Text file = new Text(); + MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); + Status status = Status.parseFrom(entry.getValue().get()); + log.info("Got status for {}: {}", file, ProtobufUtil.toString(status)); + fileToStatus.put(file.toString(), status); + } + + return fileToStatus; + } + + @Test + public void testActiveWalPrecludesClosing() throws Exception { + final String table = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.flush(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Waiting for replication table to come online"); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + } + + @Test + public void testUnreferencedWalInTserverIsClosed() throws Exception { + final String[] names = getUniqueNames(2); + // `table` will be replicated, `otherTable` is only used to roll the WAL on the tserver + final String table = names[0], otherTable = names[1]; + final Connector conn = getConnector(); + + // Bring the replication table online first and foremost + ReplicationTable.setOnline(conn); + + log.info("Creating {}", table); + conn.tableOperations().create(table); + + conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); + + log.info("Writing a few mutations to the table"); + + BatchWriter bw = conn.createBatchWriter(table, null); + + byte[] empty = new byte[0]; + for (int i = 0; i < 5; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } + + log.info("Flushing mutations to the server"); + bw.close(); + + log.info("Checking that metadata only has one WAL recorded for this table"); + + Set<String> wals = getWalsForTable(table); + Assert.assertEquals("Expected to only find one WAL for the table", 1, wals.size()); + + log.info("Compacting the table which will remove all WALs from the tablets"); + + // Flush our test table to remove the WAL references in it + conn.tableOperations().flush(table, null, null, true); + // Flush the metadata table too because it will have a reference to the WAL + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + + log.info("Fetching replication statuses from metadata table"); + + Map<String,Status> fileToStatus = getMetadataStatusForTable(table); + + Assert.assertEquals("Expected to only find one replication status message", 1, fileToStatus.size()); + + String walName = fileToStatus.keySet().iterator().next(); + Assert.assertEquals("Expected log file name from tablet to equal replication entry", wals.iterator().next(), walName); + + Status status = fileToStatus.get(walName); + + Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); + + log.info("Checking to see that log entries are removed from tablet section after MinC"); + // After compaction, the log column should be gone from the tablet + Set<String> walsAfterMinc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); + + Set<String> filesForTable = getFilesForTable(table); + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); + log.info("Files for table before MajC: {}", filesForTable); + + // Issue a MajC to roll a new file in HDFS + conn.tableOperations().compact(table, null, null, false, true); + + Set<String> filesForTableAfterCompaction = getFilesForTable(table); + + log.info("Files for table after MajC: {}", filesForTableAfterCompaction); + + Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTableAfterCompaction.size()); + Assert.assertNotEquals("Expected the files before and after compaction to differ", filesForTableAfterCompaction, filesForTable); + + // Use the rfile which was just replaced by the MajC to determine when the GC has ran + Path fileToBeDeleted = new Path(filesForTable.iterator().next()); + FileSystem fs = fileToBeDeleted.getFileSystem(new Configuration()); + + boolean fileExists = fs.exists(fileToBeDeleted); + while (fileExists) { + log.info("File which should get deleted still exists: {}", fileToBeDeleted); + Thread.sleep(2000); + fileExists = fs.exists(fileToBeDeleted); + } + + // At this point in time, we *know* that the GarbageCollector has run which means that the Status + // for our WAL should not be altered. + + log.info("Re-checking that WALs are still not referenced for our table"); + + Set<String> walsAfterMajc = getWalsForTable(table); + Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); + + Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); + Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); + + Assert.assertEquals("Status before and after MinC should be identical", fileToStatus, fileToStatusAfterMinc); + + /* + * To verify that the WALs is still getting closed, we have to force the tserver to close the existing WAL and open a new one instead. The easiest way to do + * this is to write a load of data that will exceed the 1.33% full threshold that the logger keeps track of + */ + + conn.tableOperations().create(otherTable); + bw = conn.createBatchWriter(otherTable, null); + // 500k + byte[] bigValue = new byte[1024 * 500]; + Arrays.fill(bigValue, (byte)1); + // 500k * 50 + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put(empty, empty, bigValue); + bw.addMutation(m); + if (i % 10 == 0) { + bw.flush(); + } + } + + bw.close(); + + conn.tableOperations().flush(otherTable, null, null, true); + + // Get the tservers which the master deems as active - final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(AbstractMacIT.ROOT_PASSWORD)), ++ final ClientContext context = new ClientContext(conn.getInstance(), new Credentials("root", new PasswordToken(ConfigurableMacIT.ROOT_PASSWORD)), + getClientConfig()); + List<String> tservers = MasterClient.execute(context, new ClientExecReturn<List<String>,MasterClientService.Client>() { + @Override + public List<String> execute(MasterClientService.Client client) throws Exception { + return client.getActiveTservers(Tracer.traceInfo(), context.rpcCreds()); + } + }); + + Assert.assertEquals("Expected only one active tservers", 1, tservers.size()); + + String tserver = tservers.get(0); + + // Get the active WALs from that server + log.info("Fetching active WALs from {}", tserver); + + Client client = ThriftUtil.getTServerClient(tserver, context); + List<String> activeWalsForTserver = client.getActiveLogs(Tracer.traceInfo(), context.rpcCreds()); + + log.info("Active wals: {}", activeWalsForTserver); + + Assert.assertEquals("Expected to find only one active WAL", 1, activeWalsForTserver.size()); + + String activeWal = new Path(activeWalsForTserver.get(0)).toString(); + + Assert.assertNotEquals("Current active WAL on tserver should not be the original WAL we saw", walName, activeWal); + + log.info("Ensuring that replication status does get closed after WAL is no longer in use by Tserver"); + + do { + Map<String,Status> replicationStatuses = getMetadataStatusForTable(table); + + log.info("Got replication status messages {}", replicationStatuses); + Assert.assertEquals("Did not expect to find additional status records", 1, replicationStatuses.size()); + + status = replicationStatuses.values().iterator().next(); + log.info("Current status: {}", ProtobufUtil.toString(status)); + + if (status.getClosed()) { + return; + } + + log.info("Status is not yet closed, waiting for garbage collector to close it"); + + Thread.sleep(2000); + } while (true); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e16d55f/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java index 68e57d1,0000000..6363ba5 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java @@@ -1,115 -1,0 +1,114 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.StatusUtil; +import org.apache.accumulo.core.replication.proto.Replication.Status; +import org.apache.accumulo.core.security.TablePermission; ++import org.apache.accumulo.harness.SharedMiniClusterIT; +import org.apache.accumulo.server.util.ReplicationTableUtil; - import org.apache.accumulo.test.functional.SimpleMacIT; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * + */ - @SuppressWarnings("deprecation") - public class StatusCombinerMacIT extends SimpleMacIT { ++public class StatusCombinerMacIT extends SharedMiniClusterIT { + + @Test + public void testCombinerSetOnMetadata() throws Exception { + TableOperations tops = getConnector().tableOperations(); + Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(MetadataTable.NAME); + + Assert.assertTrue(iterators.containsKey(ReplicationTableUtil.COMBINER_NAME)); + EnumSet<IteratorScope> scopes = iterators.get(ReplicationTableUtil.COMBINER_NAME); + Assert.assertEquals(3, scopes.size()); + Assert.assertTrue(scopes.contains(IteratorScope.scan)); + Assert.assertTrue(scopes.contains(IteratorScope.minc)); + Assert.assertTrue(scopes.contains(IteratorScope.majc)); + + Iterable<Entry<String,String>> propIter = tops.getProperties(MetadataTable.NAME); + HashMap<String,String> properties = new HashMap<String,String>(); + for (Entry<String,String> entry : propIter) { + properties.put(entry.getKey(), entry.getValue()); + } + + for (IteratorScope scope : scopes) { + String key = Property.TABLE_ITERATOR_PREFIX.getKey() + scope.name() + "." + ReplicationTableUtil.COMBINER_NAME + ".opt.columns"; + Assert.assertTrue("Properties did not contain key : " + key, properties.containsKey(key)); + Assert.assertEquals(MetadataSchema.ReplicationSection.COLF.toString(), properties.get(key)); + } + } + + @Test + public void test() throws Exception { + Connector conn = getConnector(); + + ReplicationTable.setOnline(conn); + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); + long createTime = System.currentTimeMillis(); + try { + Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3"); + StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(createTime)); + bw.addMutation(m); + } finally { + bw.close(); + } + + Scanner s = ReplicationTable.getScanner(conn); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Assert.assertEquals(StatusUtil.fileCreatedValue(createTime), entry.getValue()); + + bw = ReplicationTable.getBatchWriter(conn); + try { + Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3"); + StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.replicated(Long.MAX_VALUE))); + bw.addMutation(m); + } finally { + bw.close(); + } + + s = ReplicationTable.getScanner(conn); + entry = Iterables.getOnlyElement(s); + Status stat = Status.parseFrom(entry.getValue().get()); + Assert.assertEquals(Long.MAX_VALUE, stat.getBegin()); + } + +}