http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java deleted file mode 100644 index 77198df..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java +++ /dev/null @@ -1,1436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.IteratorSetting.Column; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.data.impl.KeyExtent; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.iterators.conf.ColumnSet; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.accumulo.core.replication.ReplicationTarget; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooCache; -import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; -import org.apache.accumulo.fate.zookeeper.ZooLock; -import org.apache.accumulo.gc.SimpleGarbageCollector; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.server.log.WalStateManager; -import org.apache.accumulo.server.log.WalStateManager.WalState; -import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.replication.ReplicaSystemFactory; -import org.apache.accumulo.server.replication.StatusCombiner; -import org.apache.accumulo.server.replication.StatusFormatter; -import org.apache.accumulo.server.replication.StatusUtil; -import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.accumulo.server.util.ReplicationTableUtil; -import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.protobuf.TextFormat; - -/** - * Replication tests which verify expected functionality using a single MAC instance. A MockReplicaSystem is used to "fake" the peer instance that we're - * replicating to. This lets us test replication in a functional way without having to worry about two real systems. - */ -public class ReplicationIT extends ConfigurableMacBase { - private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class); - - @Override - public int defaultTimeoutSeconds() { - return 60 * 10; - } - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - // Run the master replication loop run frequently - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); - cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s"); - cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M"); - cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "0"); - cfg.setProperty(Property.REPLICATION_NAME, "master"); - cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s"); - cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s"); - cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M"); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - cfg.setNumTservers(1); - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - private Multimap<String,String> getLogs(Connector conn) throws Exception { - // Map of server to tableId - Multimap<TServerInstance,String> serverToTableID = HashMultimap.create(); - Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - scanner.setRange(MetadataSchema.TabletsSection.getRange()); - scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); - for (Entry<Key,Value> entry : scanner) { - TServerInstance key = new TServerInstance(entry.getValue(), entry.getKey().getColumnQualifier()); - byte[] tableId = KeyExtent.tableOfMetadataRow(entry.getKey().getRow()); - serverToTableID.put(key, new String(tableId, UTF_8)); - } - // Map of logs to tableId - Multimap<String,String> logs = HashMultimap.create(); - Instance i = conn.getInstance(); - ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); - WalStateManager wals = new WalStateManager(conn.getInstance(), zk); - for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) { - for (UUID id : entry.getValue()) { - Pair<WalState,Path> state = wals.state(entry.getKey(), id); - for (String tableId : serverToTableID.get(entry.getKey())) { - logs.put(state.getSecond().toString(), tableId); - } - } - } - return logs; - } - - private Multimap<String,String> getAllLogs(Connector conn) throws Exception { - Multimap<String,String> logs = getLogs(conn); - try { - Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY); - StatusSection.limit(scanner); - Text buff = new Text(); - for (Entry<Key,Value> entry : scanner) { - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return logs; - } - - StatusSection.getFile(entry.getKey(), buff); - String file = buff.toString(); - StatusSection.getTableId(entry.getKey(), buff); - String tableId = buff.toString(); - - logs.put(file, tableId); - } - } catch (TableOfflineException e) { - log.debug("Replication table isn't online yet"); - } - return logs; - } - - private void waitForGCLock(Connector conn) throws InterruptedException { - // Check if the GC process has the lock before wasting our retry attempts - ZooKeeperInstance zki = (ZooKeeperInstance) conn.getInstance(); - ZooCacheFactory zcf = new ZooCacheFactory(); - ZooCache zcache = zcf.getZooCache(zki.getZooKeepers(), zki.getZooKeepersSessionTimeOut()); - String zkPath = ZooUtil.getRoot(conn.getInstance()) + Constants.ZGC_LOCK; - log.info("Looking for GC lock at {}", zkPath); - byte[] data = ZooLock.getLockData(zcache, zkPath, null); - while (null == data) { - log.info("Waiting for GC ZooKeeper lock to be acquired"); - Thread.sleep(1000); - data = ZooLock.getLockData(zcache, zkPath, null); - } - } - - @Test - public void replicationTableCreated() throws AccumuloException, AccumuloSecurityException { - Assert.assertTrue(getConnector().tableOperations().exists(ReplicationTable.NAME)); - Assert.assertEquals(ReplicationTable.ID, getConnector().tableOperations().tableIdMap().get(ReplicationTable.NAME)); - } - - @Test - public void verifyReplicationTableConfig() throws AccumuloException, TableNotFoundException, AccumuloSecurityException { - TableOperations tops = getConnector().tableOperations(); - Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(ReplicationTable.NAME); - - // verify combiners are only iterators (no versioning) - Assert.assertEquals(1, iterators.size()); - - // look for combiner - Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME)); - Assert.assertTrue(iterators.get(ReplicationTable.COMBINER_NAME).containsAll(EnumSet.allOf(IteratorScope.class))); - for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) { - IteratorSetting is = tops.getIteratorSetting(ReplicationTable.NAME, ReplicationTable.COMBINER_NAME, scope); - Assert.assertEquals(30, is.getPriority()); - Assert.assertEquals(StatusCombiner.class.getName(), is.getIteratorClass()); - Assert.assertEquals(1, is.getOptions().size()); - Assert.assertTrue(is.getOptions().containsKey("columns")); - String cols = is.getOptions().get("columns"); - Column statusSectionCol = new Column(StatusSection.NAME); - Column workSectionCol = new Column(WorkSection.NAME); - Assert.assertEquals( - ColumnSet.encodeColumns(statusSectionCol.getColumnFamily(), statusSectionCol.getColumnQualifier()) + "," - + ColumnSet.encodeColumns(workSectionCol.getColumnFamily(), workSectionCol.getColumnQualifier()), cols); - } - - boolean foundLocalityGroups = false; - boolean foundLocalityGroupDef1 = false; - boolean foundLocalityGroupDef2 = false; - boolean foundFormatter = false; - Joiner j = Joiner.on(","); - Function<Text,String> textToString = new Function<Text,String>() { - @Override - public String apply(Text text) { - return text.toString(); - } - }; - for (Entry<String,String> p : tops.getProperties(ReplicationTable.NAME)) { - String key = p.getKey(); - String val = p.getValue(); - // STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS - if (key.equals(Property.TABLE_FORMATTER_CLASS.getKey()) && val.equals(StatusFormatter.class.getName())) { - // look for formatter - foundFormatter = true; - } else if (key.equals(Property.TABLE_LOCALITY_GROUPS.getKey()) && val.equals(j.join(ReplicationTable.LOCALITY_GROUPS.keySet()))) { - // look for locality groups enabled - foundLocalityGroups = true; - } else if (key.startsWith(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey())) { - // look for locality group column family definitions - if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.STATUS_LG_NAME) - && val.equals(j.join(Iterables.transform(ReplicationTable.STATUS_LG_COLFAMS, textToString)))) { - foundLocalityGroupDef1 = true; - } else if (key.equals(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + ReplicationTable.WORK_LG_NAME) - && val.equals(j.join(Iterables.transform(ReplicationTable.WORK_LG_COLFAMS, textToString)))) { - foundLocalityGroupDef2 = true; - } - } - } - Assert.assertTrue(foundLocalityGroups); - Assert.assertTrue(foundLocalityGroupDef1); - Assert.assertTrue(foundLocalityGroupDef2); - Assert.assertTrue(foundFormatter); - } - - @Test - public void correctRecordsCompleteFile() throws Exception { - Connector conn = getConnector(); - String table = "table1"; - conn.tableOperations().create(table); - // If we have more than one tserver, this is subject to a race condition. - conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); - - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - for (int i = 0; i < 10; i++) { - Mutation m = new Mutation(Integer.toString(i)); - m.put(new byte[0], new byte[0], new byte[0]); - bw.addMutation(m); - } - - bw.close(); - - // After writing data, we'll get a replication table online - boolean online = ReplicationTable.isOnline(conn); - int attempts = 10; - do { - if (!online) { - UtilWaitThread.sleep(2000); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table was not online", online); - - for (int i = 0; i < 5; i++) { - if (conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)) { - break; - } - log.info("Could not read replication table, waiting and will retry"); - Thread.sleep(2000); - } - - Assert.assertTrue("'root' user could not read the replication table", - conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)); - - Set<String> replRows = Sets.newHashSet(); - Scanner scanner; - attempts = 5; - while (replRows.isEmpty() && attempts > 0) { - scanner = ReplicationTable.getScanner(conn); - StatusSection.limit(scanner); - for (Entry<Key,Value> entry : scanner) { - Key k = entry.getKey(); - - String fileUri = k.getRow().toString(); - try { - new URI(fileUri); - } catch (URISyntaxException e) { - Assert.fail("Expected a valid URI: " + fileUri); - } - - replRows.add(fileUri); - } - } - - Set<String> wals = Sets.newHashSet(); - attempts = 5; - Instance i = conn.getInstance(); - ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), ""); - while (wals.isEmpty() && attempts > 0) { - WalStateManager markers = new WalStateManager(i, zk); - for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) { - wals.add(entry.getKey().toString()); - } - attempts--; - } - - // We only have one file that should need replication (no trace table) - // We should find an entry in tablet and in the repl row - Assert.assertEquals("Rows found: " + replRows, 1, replRows.size()); - - // There should only be one extra WALog that replication doesn't know about - replRows.removeAll(wals); - Assert.assertEquals(2, wals.size()); - Assert.assertEquals(0, replRows.size()); - } - - @Test - public void noRecordsWithoutReplication() throws Exception { - Connector conn = getConnector(); - List<String> tables = new ArrayList<>(); - - // replication shouldn't be online when we begin - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - for (int i = 0; i < 5; i++) { - String name = "table" + i; - tables.add(name); - conn.tableOperations().create(name); - } - - // nor after we create some tables (that aren't being replicated) - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - for (String table : tables) { - writeSomeData(conn, table, 5, 5); - } - - // After writing data, still no replication table - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - for (String table : tables) { - conn.tableOperations().compact(table, null, null, true, true); - } - - // After compacting data, still no replication table - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - for (String table : tables) { - conn.tableOperations().delete(table); - } - - // After deleting tables, still no replication table - Assert.assertFalse(ReplicationTable.isOnline(conn)); - } - - @Test - public void twoEntriesForTwoTables() throws Exception { - Connector conn = getConnector(); - String table1 = "table1", table2 = "table2"; - - // replication shouldn't exist when we begin - Assert.assertFalse("Replication table already online at the beginning of the test", ReplicationTable.isOnline(conn)); - - // Create two tables - conn.tableOperations().create(table1); - conn.tableOperations().create(table2); - - // Enable replication on table1 - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - - // Despite having replication on, we shouldn't have any need to write a record to it (and bring it online) - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - // Write some data to table1 - writeSomeData(conn, table1, 50, 50); - - // After the commit for these mutations finishes, we'll get a replication entry in accumulo.metadata for table1 - // Don't want to compact table1 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there - - // After writing data, we'll get a replication table online - boolean online = ReplicationTable.isOnline(conn); - int attempts = 10; - do { - if (!online) { - UtilWaitThread.sleep(5000); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); - - Assert.assertTrue(ReplicationTable.isOnline(conn)); - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); - - // Verify that we found a single replication record that's for table1 - Scanner s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - Iterator<Entry<Key,Value>> iter = s.iterator(); - attempts = 5; - while (attempts > 0) { - if (!iter.hasNext()) { - s.close(); - Thread.sleep(1000); - s = ReplicationTable.getScanner(conn); - iter = s.iterator(); - attempts--; - } else { - break; - } - } - Assert.assertTrue(iter.hasNext()); - Entry<Key,Value> entry = iter.next(); - // We should at least find one status record for this table, we might find a second if another log was started from ingesting the data - Assert.assertEquals("Expected to find replication entry for " + table1, conn.tableOperations().tableIdMap().get(table1), entry.getKey() - .getColumnQualifier().toString()); - s.close(); - - // Enable replication on table2 - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - - // Write some data to table2 - writeSomeData(conn, table2, 50, 50); - - // After the commit on these mutations, we'll get a replication entry in accumulo.metadata for table2 - // Don't want to compact table2 as it ultimately cause the entry in accumulo.metadata to be removed before we can verify it's there - - // After writing data, we'll get a replication table online - Assert.assertTrue(ReplicationTable.isOnline(conn)); - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); - - Set<String> tableIds = Sets.newHashSet(conn.tableOperations().tableIdMap().get(table1), conn.tableOperations().tableIdMap().get(table2)); - Set<String> tableIdsForMetadata = Sets.newHashSet(tableIds); - - // Wait to make sure the table permission propagate - Thread.sleep(5000); - - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(MetadataSchema.ReplicationSection.getRange()); - - List<Entry<Key,Value>> records = new ArrayList<>(); - for (Entry<Key,Value> metadata : s) { - records.add(metadata); - } - - Assert.assertEquals("Expected to find 2 records, but actually found " + records, 2, records.size()); - - for (Entry<Key,Value> metadata : records) { - Assert.assertTrue("Expected record to be in metadata but wasn't " + metadata.getKey().toStringNoTruncate() + ", tableIds remaining " - + tableIdsForMetadata, tableIdsForMetadata.remove(metadata.getKey().getColumnQualifier().toString())); - } - - Assert.assertTrue("Expected that we had removed all metadata entries " + tableIdsForMetadata, tableIdsForMetadata.isEmpty()); - - // Should be creating these records in replication table from metadata table every second - Thread.sleep(5000); - - // Verify that we found two replication records: one for table1 and one for table2 - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - iter = s.iterator(); - Assert.assertTrue("Found no records in replication table", iter.hasNext()); - entry = iter.next(); - Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString())); - Assert.assertTrue("Expected to find two elements in replication table, only found one ", iter.hasNext()); - entry = iter.next(); - Assert.assertTrue("Expected to find element in replication table", tableIds.remove(entry.getKey().getColumnQualifier().toString())); - Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext()); - } - - private void writeSomeData(Connector conn, String table, int rows, int cols) throws Exception { - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - for (int row = 0; row < rows; row++) { - Mutation m = new Mutation(Integer.toString(row)); - for (int col = 0; col < cols; col++) { - String value = Integer.toString(col); - m.put(value, "", value); - } - bw.addMutation(m); - } - bw.close(); - } - - @Test - public void replicationEntriesPrecludeWalDeletion() throws Exception { - final Connector conn = getConnector(); - String table1 = "table1", table2 = "table2", table3 = "table3"; - final Multimap<String,String> logs = HashMultimap.create(); - final AtomicBoolean keepRunning = new AtomicBoolean(true); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - // Should really be able to interrupt here, but the Scanner throws a fit to the logger - // when that happens - while (keepRunning.get()) { - try { - logs.putAll(getAllLogs(conn)); - } catch (Exception e) { - log.error("Error getting logs", e); - } - } - } - - }); - - t.start(); - - conn.tableOperations().create(table1); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - Thread.sleep(2000); - - // Write some data to table1 - writeSomeData(conn, table1, 200, 500); - - conn.tableOperations().create(table2); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - Thread.sleep(2000); - - writeSomeData(conn, table2, 200, 500); - - conn.tableOperations().create(table3); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - Thread.sleep(2000); - - writeSomeData(conn, table3, 200, 500); - - // Force a write to metadata for the data written - for (String table : Arrays.asList(table1, table2, table3)) { - conn.tableOperations().flush(table, null, null, true); - } - - keepRunning.set(false); - t.join(5000); - - // The master is only running every second to create records in the replication table from the metadata table - // Sleep a sufficient amount of time to ensure that we get the straggling WALs that might have been created at the end - Thread.sleep(5000); - - Scanner s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - Set<String> replFiles = new HashSet<>(); - for (Entry<Key,Value> entry : s) { - replFiles.add(entry.getKey().getRow().toString()); - } - - // We might have a WAL that was use solely for the replication table - // We want to remove that from our list as it should not appear in the replication table - String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME); - Iterator<Entry<String,String>> observedLogs = logs.entries().iterator(); - while (observedLogs.hasNext()) { - Entry<String,String> observedLog = observedLogs.next(); - if (replicationTableId.equals(observedLog.getValue())) { - log.info("Removing {} because its tableId is for the replication table", observedLog); - observedLogs.remove(); - } - } - - // We should have *some* reference to each log that was seen in the metadata table - // They might not yet all be closed though (might be newfile) - Assert.assertTrue("Metadata log distribution: " + logs + "replFiles " + replFiles, logs.keySet().containsAll(replFiles)); - Assert.assertTrue("Difference between replication entries and current logs is bigger than one", logs.keySet().size() - replFiles.size() <= 1); - - final Configuration conf = new Configuration(); - for (String replFile : replFiles) { - Path p = new Path(replFile); - FileSystem fs = p.getFileSystem(conf); - Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p)); - } - } - - @Test - public void combinerWorksOnMetadata() throws Exception { - Connector conn = getConnector(); - - conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE); - - ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME); - - Status stat1 = StatusUtil.fileCreated(100); - Status stat2 = StatusUtil.fileClosed(); - - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid"); - m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1)); - bw.addMutation(m); - bw.close(); - - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(ReplicationSection.getRange()); - - Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); - Assert.assertEquals(stat1, actual); - - bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid"); - m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2)); - bw.addMutation(m); - bw.close(); - - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(ReplicationSection.getRange()); - - actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get()); - Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build(); - - Assert.assertEquals(expected, actual); - } - - @Test - public void noDeadlock() throws Exception { - final Connector conn = getConnector(); - - ReplicationTable.setOnline(conn); - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); - conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null); - - String table1 = "table1", table2 = "table2", table3 = "table3"; - conn.tableOperations().create(table1); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - conn.tableOperations().create(table2); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - conn.tableOperations().create(table3); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - - writeSomeData(conn, table1, 200, 500); - - writeSomeData(conn, table2, 200, 500); - - writeSomeData(conn, table3, 200, 500); - - // Flush everything to try to make the replication records - for (String table : Arrays.asList(table1, table2, table3)) { - conn.tableOperations().flush(table, null, null, true); - } - - // Flush everything to try to make the replication records - for (String table : Arrays.asList(table1, table2, table3)) { - conn.tableOperations().flush(table, null, null, true); - } - - for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) { - Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); - } - } - - @Test - public void filesClosedAfterUnused() throws Exception { - Connector conn = getConnector(); - - String table = "table"; - conn.tableOperations().create(table); - String tableId = conn.tableOperations().tableIdMap().get(table); - - Assert.assertNotNull(tableId); - - conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - // just sleep - conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", - ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000")); - - // Write a mutation to make a log file - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - Mutation m = new Mutation("one"); - m.put("", "", ""); - bw.addMutation(m); - bw.close(); - - // Write another to make sure the logger rolls itself? - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - m = new Mutation("three"); - m.put("", "", ""); - bw.addMutation(m); - bw.close(); - - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME); - s.setRange(TabletsSection.getRange(tableId)); - Set<String> wals = new HashSet<>(); - for (Entry<Key,Value> entry : s) { - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - wals.add(new Path(logEntry.filename).toString()); - } - - log.warn("Found wals {}", wals); - - bw = conn.createBatchWriter(table, new BatchWriterConfig()); - m = new Mutation("three"); - byte[] bytes = new byte[1024 * 1024]; - m.put("1".getBytes(), new byte[0], bytes); - m.put("2".getBytes(), new byte[0], bytes); - m.put("3".getBytes(), new byte[0], bytes); - m.put("4".getBytes(), new byte[0], bytes); - m.put("5".getBytes(), new byte[0], bytes); - bw.addMutation(m); - bw.close(); - - conn.tableOperations().flush(table, null, null, true); - - while (!ReplicationTable.isOnline(conn)) { - UtilWaitThread.sleep(2000); - } - - for (int i = 0; i < 10; i++) { - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.fetchColumnFamily(LogColumnFamily.NAME); - s.setRange(TabletsSection.getRange(tableId)); - for (Entry<Key,Value> entry : s) { - log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue()); - } - - try { - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - Text buff = new Text(); - boolean allReferencedLogsClosed = true; - int recordsFound = 0; - for (Entry<Key,Value> e : s) { - recordsFound++; - allReferencedLogsClosed = true; - StatusSection.getFile(e.getKey(), buff); - String file = buff.toString(); - if (wals.contains(file)) { - Status stat = Status.parseFrom(e.getValue().get()); - if (!stat.getClosed()) { - log.info("{} wasn't closed", file); - allReferencedLogsClosed = false; - } - } - } - - if (recordsFound > 0 && allReferencedLogsClosed) { - return; - } - Thread.sleep(2000); - } catch (RuntimeException e) { - Throwable cause = e.getCause(); - if (cause instanceof AccumuloSecurityException) { - AccumuloSecurityException ase = (AccumuloSecurityException) cause; - switch (ase.getSecurityErrorCode()) { - case PERMISSION_DENIED: - // We tried to read the replication table before the GRANT went through - Thread.sleep(2000); - break; - default: - throw e; - } - } - } - } - - Assert.fail("We had a file that was referenced but didn't get closed"); - } - - @Test - public void singleTableWithSingleTarget() throws Exception { - // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons - // against expected Status messages. - getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); - - Connector conn = getConnector(); - String table1 = "table1"; - - // replication shouldn't be online when we begin - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - // Create a table - conn.tableOperations().create(table1); - - int attempts = 10; - - // Might think the table doesn't yet exist, retry - while (attempts > 0) { - try { - // Enable replication on table1 - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - // Replicate table1 to cluster1 in the table with id of '4' - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); - // Sleep for 100 seconds before saying something is replicated - conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", - ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000")); - break; - } catch (Exception e) { - attempts--; - if (attempts <= 0) { - throw e; - } - UtilWaitThread.sleep(2000); - } - } - - // Write some data to table1 - writeSomeData(conn, table1, 2000, 50); - - // Make sure the replication table is online at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 10; - do { - if (!online) { - UtilWaitThread.sleep(2000); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table was never created", online); - - // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master) - for (int i = 0; i < 10 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) { - UtilWaitThread.sleep(2000); - } - - Assert.assertTrue("Combiner was never set on replication table", - conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME)); - - // Trigger the minor compaction, waiting for it to finish. - // This should write the entry to metadata that the file has data - conn.tableOperations().flush(table1, null, null, true); - - // Make sure that we have one status element, should be a new file - Scanner s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - Entry<Key,Value> entry = null; - Status expectedStatus = StatusUtil.openWithUnknownLength(); - attempts = 10; - // This record will move from new to new with infinite length because of the minc (flush) - while (null == entry && attempts > 0) { - try { - entry = Iterables.getOnlyElement(s); - Status actual = Status.parseFrom(entry.getValue().get()); - if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) { - entry = null; - // the master process didn't yet fire and write the new mutation, wait for it to do - // so and try to read it again - Thread.sleep(1000); - } - } catch (NoSuchElementException e) { - entry = null; - Thread.sleep(500); - } catch (IllegalArgumentException e) { - // saw this contain 2 elements once - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - throw e; - } finally { - attempts--; - } - } - - Assert.assertNotNull("Could not find expected entry in replication table", entry); - Status actual = Status.parseFrom(entry.getValue().get()); - Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), - !actual.getClosed() && actual.getInfiniteEnd()); - - // Try a couple of times to watch for the work record to be created - boolean notFound = true; - for (int i = 0; i < 10 && notFound; i++) { - s = ReplicationTable.getScanner(conn); - WorkSection.limit(s); - int elementsFound = Iterables.size(s); - if (0 < elementsFound) { - Assert.assertEquals(1, elementsFound); - notFound = false; - } - Thread.sleep(500); - } - - // If we didn't find the work record, print the contents of the table - if (notFound) { - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - Assert.assertFalse("Did not find the work entry for the status entry", notFound); - } - - // Write some more data so that we over-run the single WAL - writeSomeData(conn, table1, 3000, 50); - - log.info("Issued compaction for table"); - conn.tableOperations().compact(table1, null, null, true, true); - log.info("Compaction completed"); - - // Master is creating entries in the replication table from the metadata table every second. - // Compaction should trigger the record to be written to metadata. Wait a bit to ensure - // that the master has time to work. - Thread.sleep(5000); - - s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - int numRecords = 0; - for (Entry<Key,Value> e : s) { - numRecords++; - log.info("Found status record {}\t{}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get()))); - } - - Assert.assertEquals(2, numRecords); - - // We should eventually get 2 work records recorded, need to account for a potential delay though - // might see: status1 -> work1 -> status2 -> (our scans) -> work2 - notFound = true; - for (int i = 0; i < 10 && notFound; i++) { - s = ReplicationTable.getScanner(conn); - WorkSection.limit(s); - int elementsFound = Iterables.size(s); - if (2 == elementsFound) { - notFound = false; - } - Thread.sleep(500); - } - - // If we didn't find the work record, print the contents of the table - if (notFound) { - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - Assert.assertFalse("Did not find the work entries for the status entries", notFound); - } - } - - @Test - public void correctClusterNameInWorkEntry() throws Exception { - Connector conn = getConnector(); - String table1 = "table1"; - - // replication shouldn't be online when we begin - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - // Create two tables - conn.tableOperations().create(table1); - - int attempts = 5; - while (attempts > 0) { - try { - // Enable replication on table1 - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - // Replicate table1 to cluster1 in the table with id of '4' - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); - attempts = 0; - } catch (Exception e) { - attempts--; - if (attempts <= 0) { - throw e; - } - UtilWaitThread.sleep(500); - } - } - - // Write some data to table1 - writeSomeData(conn, table1, 2000, 50); - conn.tableOperations().flush(table1, null, null, true); - - String tableId = conn.tableOperations().tableIdMap().get(table1); - Assert.assertNotNull("Table ID was null", tableId); - - // Make sure the replication table exists at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 5; - do { - if (!online) { - UtilWaitThread.sleep(500); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); - - for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) { - Thread.sleep(1000); - } - - Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ)); - - boolean notFound = true; - Scanner s; - for (int i = 0; i < 10 && notFound; i++) { - s = ReplicationTable.getScanner(conn); - WorkSection.limit(s); - try { - Entry<Key,Value> e = Iterables.getOnlyElement(s); - Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText(); - Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); - notFound = false; - } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) { - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - Assert.fail("Found more than one work section entry"); - } - - Thread.sleep(500); - } - - if (notFound) { - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - Assert.assertFalse("Did not find the work entry for the status entry", notFound); - } - } - - @Test - public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception { - getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); - - final Connector conn = getConnector(); - - ReplicationTable.setOnline(conn); - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); - conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null); - - final AtomicBoolean keepRunning = new AtomicBoolean(true); - final Set<String> metadataWals = new HashSet<>(); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - // Should really be able to interrupt here, but the Scanner throws a fit to the logger - // when that happens - while (keepRunning.get()) { - try { - metadataWals.addAll(getLogs(conn).keySet()); - } catch (Exception e) { - log.error("Metadata table doesn't exist"); - } - } - } - - }); - - t.start(); - - String table1 = "table1", table2 = "table2", table3 = "table3"; - - try { - conn.tableOperations().create(table1); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", - ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null)); - - // Write some data to table1 - writeSomeData(conn, table1, 200, 500); - - conn.tableOperations().create(table2); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - - writeSomeData(conn, table2, 200, 500); - - conn.tableOperations().create(table3); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true"); - conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "1"); - - writeSomeData(conn, table3, 200, 500); - - // Flush everything to try to make the replication records - for (String table : Arrays.asList(table1, table2, table3)) { - conn.tableOperations().compact(table, null, null, true, true); - } - } finally { - keepRunning.set(false); - t.join(5000); - Assert.assertFalse(t.isAlive()); - } - - // Kill the tserver(s) and restart them - // to ensure that the WALs we previously observed all move to closed. - cluster.getClusterControl().stop(ServerType.TABLET_SERVER); - cluster.getClusterControl().start(ServerType.TABLET_SERVER); - - // Make sure we can read all the tables (recovery complete) - for (String table : Arrays.asList(table1, table2, table3)) { - Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); - } - - // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses - // in the metadata table, and then in the replication table - Process gc = cluster.exec(SimpleGarbageCollector.class); - - waitForGCLock(conn); - - Thread.sleep(1000); - - log.info("GC is up and should have had time to run at least once by now"); - - try { - boolean allClosed = true; - - // We should either find all closed records or no records - // After they're closed, they are candidates for deletion - for (int i = 0; i < 10; i++) { - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(Range.prefix(ReplicationSection.getRowPrefix())); - Iterator<Entry<Key,Value>> iter = s.iterator(); - - long recordsFound = 0l; - while (allClosed && iter.hasNext()) { - Entry<Key,Value> entry = iter.next(); - String wal = entry.getKey().getRow().toString(); - if (metadataWals.contains(wal)) { - Status status = Status.parseFrom(entry.getValue().get()); - log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status)); - allClosed &= status.getClosed(); - recordsFound++; - } - } - - log.info("Found {} records from the metadata table", recordsFound); - if (allClosed) { - break; - } - - UtilWaitThread.sleep(2000); - } - - if (!allClosed) { - Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(Range.prefix(ReplicationSection.getRowPrefix())); - for (Entry<Key,Value> entry : s) { - log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); - } - Assert.fail("Expected all replication records in the metadata table to be closed"); - } - - for (int i = 0; i < 10; i++) { - allClosed = true; - - Scanner s = ReplicationTable.getScanner(conn); - Iterator<Entry<Key,Value>> iter = s.iterator(); - - long recordsFound = 0l; - while (allClosed && iter.hasNext()) { - Entry<Key,Value> entry = iter.next(); - String wal = entry.getKey().getRow().toString(); - if (metadataWals.contains(wal)) { - Status status = Status.parseFrom(entry.getValue().get()); - log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status)); - allClosed &= status.getClosed(); - recordsFound++; - } - } - - log.info("Found {} records from the replication table", recordsFound); - if (allClosed) { - break; - } - - UtilWaitThread.sleep(3000); - } - - if (!allClosed) { - Scanner s = ReplicationTable.getScanner(conn); - StatusSection.limit(s); - for (Entry<Key,Value> entry : s) { - log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get()))); - } - Assert.fail("Expected all replication records in the replication table to be closed"); - } - - } finally { - gc.destroy(); - gc.waitFor(); - } - - } - - @Test - public void replicatedStatusEntriesAreDeleted() throws Exception { - // Just stop it now, we'll restart it after we restart the tserver - getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); - - final Connector conn = getConnector(); - log.info("Got connector to MAC"); - String table1 = "table1"; - - // replication shouldn't be online when we begin - Assert.assertFalse(ReplicationTable.isOnline(conn)); - - // Create two tables - conn.tableOperations().create(table1); - - int attempts = 5; - while (attempts > 0) { - try { - // Enable replication on table1 - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true"); - // Replicate table1 to cluster1 in the table with id of '4' - conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGET.getKey() + "cluster1", "4"); - // Use the MockReplicaSystem impl and sleep for 5seconds - conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1", - ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000")); - attempts = 0; - } catch (Exception e) { - attempts--; - if (attempts <= 0) { - throw e; - } - UtilWaitThread.sleep(500); - } - } - - String tableId = conn.tableOperations().tableIdMap().get(table1); - Assert.assertNotNull("Could not determine table id for " + table1, tableId); - - // Write some data to table1 - writeSomeData(conn, table1, 2000, 50); - conn.tableOperations().flush(table1, null, null, true); - - // Make sure the replication table exists at this point - boolean online = ReplicationTable.isOnline(conn); - attempts = 10; - do { - if (!online) { - UtilWaitThread.sleep(1000); - online = ReplicationTable.isOnline(conn); - attempts--; - } - } while (!online && attempts > 0); - Assert.assertTrue("Replication table did not exist", online); - - // Grant ourselves the write permission for later - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); - - log.info("Checking for replication entries in replication"); - // Then we need to get those records over to the replication table - Scanner s; - Set<String> entries = new HashSet<>(); - for (int i = 0; i < 5; i++) { - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(ReplicationSection.getRange()); - entries.clear(); - for (Entry<Key,Value> entry : s) { - entries.add(entry.getKey().getRow().toString()); - log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - } - if (!entries.isEmpty()) { - log.info("Replication entries {}", entries); - break; - } - Thread.sleep(1000); - } - - Assert.assertFalse("Did not find any replication entries in the replication table", entries.isEmpty()); - - // Find the WorkSection record that will be created for that data we ingested - boolean notFound = true; - for (int i = 0; i < 10 && notFound; i++) { - try { - s = ReplicationTable.getScanner(conn); - WorkSection.limit(s); - Entry<Key,Value> e = Iterables.getOnlyElement(s); - log.info("Found entry: " + e.getKey().toStringNoTruncate()); - Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText(); - Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier()); - notFound = false; - } catch (NoSuchElementException e) { - - } catch (IllegalArgumentException e) { - // Somehow we got more than one element. Log what they were - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue()); - } - Assert.fail("Found more than one work section entry"); - } catch (RuntimeException e) { - // Catch a propagation issue, fail if it's not what we expect - Throwable cause = e.getCause(); - if (cause instanceof AccumuloSecurityException) { - AccumuloSecurityException sec = (AccumuloSecurityException) cause; - switch (sec.getSecurityErrorCode()) { - case PERMISSION_DENIED: - // retry -- the grant didn't happen yet - log.warn("Sleeping because permission was denied"); - break; - default: - throw e; - } - } else { - throw e; - } - } - - Thread.sleep(2000); - } - - if (notFound) { - s = ReplicationTable.getScanner(conn); - for (Entry<Key,Value> content : s) { - log.info(content.getKey().toStringNoTruncate() + " => " + ProtobufUtil.toString(Status.parseFrom(content.getValue().get()))); - } - Assert.assertFalse("Did not find the work entry for the status entry", notFound); - } - - /** - * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all - * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table - * anymore - */ - - log.info("Killing tserver"); - // Kill the tserver(s) and restart them - // to ensure that the WALs we previously observed all move to closed. - cluster.getClusterControl().stop(ServerType.TABLET_SERVER); - - log.info("Starting tserver"); - cluster.getClusterControl().start(ServerType.TABLET_SERVER); - - log.info("Waiting to read tables"); - UtilWaitThread.sleep(2 * 3 * 1000); - - // Make sure we can read all the tables (recovery complete) - for (String table : new String[] {MetadataTable.NAME, table1}) { - Iterators.size(conn.createScanner(table, Authorizations.EMPTY).iterator()); - } - - log.info("Recovered metadata:"); - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - for (Entry<Key,Value> entry : s) { - log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - } - - cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR); - - // Wait for a bit since the GC has to run (should be running after a one second delay) - waitForGCLock(conn); - - Thread.sleep(1000); - - log.info("After GC"); - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - for (Entry<Key,Value> entry : s) { - log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); - } - - // We expect no records in the metadata table after compaction. We have to poll - // because we have to wait for the StatusMaker's next iteration which will clean - // up the dangling *closed* records after we create the record in the replication table. - // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record - log.info("Checking metadata table for replication entries"); - Set<String> remaining = new HashSet<>(); - for (int i = 0; i < 10; i++) { - s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(ReplicationSection.getRange()); - remaining.clear(); - for (Entry<Key,Value> e : s) { - remaining.add(e.getKey().getRow().toString()); - } - remaining.retainAll(entries); - if (remaining.isEmpty()) { - break; - } - log.info("remaining {}", remaining); - Thread.sleep(2000); - log.info(""); - } - - Assert.assertTrue("Replication status messages were not cleaned up from metadata table", remaining.isEmpty()); - - /** - * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be - * deleted after replication occurs - */ - - int recordsFound = 0; - for (int i = 0; i < 30; i++) { - s = ReplicationTable.getScanner(conn); - recordsFound = 0; - for (Entry<Key,Value> entry : s) { - recordsFound++; - log.info("{} {}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(entry.getValue().get()))); - } - - if (recordsFound <= 2) { - break; - } else { - Thread.sleep(1000); - log.info(""); - } - } - - Assert.assertTrue("Found unexpected replication records in the replication table", recordsFound <= 2); - } -}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java deleted file mode 100644 index 80bc69d..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationRandomWalkIT.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import static org.apache.accumulo.core.conf.Property.TSERV_ARCHIVE_WALOGS; -import static org.apache.accumulo.core.conf.Property.TSERV_WALOG_MAX_SIZE; - -import java.util.Properties; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.accumulo.test.randomwalk.Environment; -import org.apache.accumulo.test.randomwalk.concurrent.Replication; -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -public class ReplicationRandomWalkIT extends ConfigurableMacBase { - - @Override - protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(TSERV_ARCHIVE_WALOGS, "false"); - cfg.setProperty(TSERV_WALOG_MAX_SIZE, "1M"); - cfg.setNumTservers(1); - } - - @Test(timeout = 5 * 60 * 1000) - public void runReplicationRandomWalkStep() throws Exception { - Replication r = new Replication(); - - Environment env = new Environment(new Properties()) { - @Override - public String getUserName() { - return "root"; - } - - @Override - public String getPassword() { - return ROOT_PASSWORD; - } - - @Override - public Connector getConnector() throws AccumuloException, AccumuloSecurityException { - return ReplicationRandomWalkIT.this.getConnector(); - } - - }; - r.visit(null, env, null); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java b/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java deleted file mode 100644 index b072aa7..0000000 --- a/test/src/test/java/org/apache/accumulo/test/replication/StatusCombinerMacIT.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test.replication; - -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.accumulo.cluster.ClusterUser; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.harness.SharedMiniClusterBase; -import org.apache.accumulo.server.replication.StatusUtil; -import org.apache.accumulo.server.replication.proto.Replication.Status; -import org.apache.accumulo.server.util.ReplicationTableUtil; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.Iterables; - -public class StatusCombinerMacIT extends SharedMiniClusterBase { - - @Override - public int defaultTimeoutSeconds() { - return 60; - } - - @Test - public void testCombinerSetOnMetadata() throws Exception { - TableOperations tops = getConnector().tableOperations(); - Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(MetadataTable.NAME); - - Assert.assertTrue(iterators.containsKey(ReplicationTableUtil.COMBINER_NAME)); - EnumSet<IteratorScope> scopes = iterators.get(ReplicationTableUtil.COMBINER_NAME); - Assert.assertEquals(3, scopes.size()); - Assert.assertTrue(scopes.contains(IteratorScope.scan)); - Assert.assertTrue(scopes.contains(IteratorScope.minc)); - Assert.assertTrue(scopes.contains(IteratorScope.majc)); - - Iterable<Entry<String,String>> propIter = tops.getProperties(MetadataTable.NAME); - HashMap<String,String> properties = new HashMap<String,String>(); - for (Entry<String,String> entry : propIter) { - properties.put(entry.getKey(), entry.getValue()); - } - - for (IteratorScope scope : scopes) { - String key = Property.TABLE_ITERATOR_PREFIX.getKey() + scope.name() + "." + ReplicationTableUtil.COMBINER_NAME + ".opt.columns"; - Assert.assertTrue("Properties did not contain key : " + key, properties.containsKey(key)); - Assert.assertEquals(MetadataSchema.ReplicationSection.COLF.toString(), properties.get(key)); - } - } - - @Test - public void test() throws Exception { - Connector conn = getConnector(); - ClusterUser user = getAdminUser(); - - ReplicationTable.setOnline(conn); - conn.securityOperations().grantTablePermission(user.getPrincipal(), ReplicationTable.NAME, TablePermission.WRITE); - BatchWriter bw = ReplicationTable.getBatchWriter(conn); - long createTime = System.currentTimeMillis(); - try { - Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3"); - StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(createTime)); - bw.addMutation(m); - } finally { - bw.close(); - } - - Scanner s = ReplicationTable.getScanner(conn); - Entry<Key,Value> entry = Iterables.getOnlyElement(s); - Assert.assertEquals(StatusUtil.fileCreatedValue(createTime), entry.getValue()); - - bw = ReplicationTable.getBatchWriter(conn); - try { - Mutation m = new Mutation("file:/accumulo/wal/HW10447.local+56808/93cdc17e-7521-44fa-87b5-37f45bcb92d3"); - StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.replicated(Long.MAX_VALUE))); - bw.addMutation(m); - } finally { - bw.close(); - } - - s = ReplicationTable.getScanner(conn); - entry = Iterables.getOnlyElement(s); - Status stat = Status.parseFrom(entry.getValue().get()); - Assert.assertEquals(Long.MAX_VALUE, stat.getBegin()); - } - -}