ACCUMULO-3147 Refactor ReplicationTable constants
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1a47b38 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1a47b38 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1a47b38 Branch: refs/heads/master Commit: f1a47b38d3f5338a9ad1104cd3009b327e08c371 Parents: cc48e37 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Wed Oct 29 15:11:48 2014 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Thu Nov 6 18:39:54 2014 -0500 ---------------------------------------------------------------------- .../client/impl/ReplicationOperationsImpl.java | 8 +- .../replication/PrintReplicationRecords.java | 2 +- .../core/replication/ReplicaSystemHelper.java | 11 +- .../core/replication/ReplicationConstants.java | 1 - .../core/replication/ReplicationTable.java | 61 +++++ .../ReplicationOperationsImplTest.java | 24 +- .../server/replication/ReplicationTable.java | 227 ------------------- .../server/replication/ReplicationUtil.java | 184 +++++++++++++-- .../replication/ReplicationTableTest.java | 11 +- .../gc/GarbageCollectWriteAheadLogs.java | 2 +- .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../CloseWriteAheadLogReferences.java | 2 +- .../gc/GarbageCollectWriteAheadLogsTest.java | 36 ++- .../CloseWriteAheadLogReferencesTest.java | 9 +- .../master/metrics/ReplicationMetrics.java | 4 +- .../DistributedWorkQueueWorkAssigner.java | 12 +- .../master/replication/FinishedWorkUpdater.java | 5 +- .../RemoveCompleteReplicationRecords.java | 4 +- .../master/replication/StatusMaker.java | 45 ++-- .../accumulo/master/replication/WorkMaker.java | 2 +- .../replication/FinishedWorkUpdaterTest.java | 15 +- .../RemoveCompleteReplicationRecordsTest.java | 13 +- .../replication/SequentialWorkAssignerTest.java | 11 +- .../master/replication/StatusMakerTest.java | 2 +- .../replication/UnorderedWorkAssignerTest.java | 16 +- .../master/replication/WorkMakerTest.java | 2 +- .../monitor/servlets/ReplicationServlet.java | 3 +- .../replication/ReplicationProcessor.java | 2 +- .../replication/MultiInstanceReplicationIT.java | 6 +- .../test/replication/ReplicationIT.java | 18 +- .../test/replication/StatusCombinerMacTest.java | 5 +- .../UnorderedWorkAssignerReplicationIT.java | 7 +- 32 files changed, 393 insertions(+), 361 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java index f820aa4..a8f3f21 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java @@ -44,8 +44,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; @@ -152,7 +152,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations { log.info("reading from replication table"); boolean allReplicationRefsReplicated = false; while (!allReplicationRefsReplicated) { - BatchScanner bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 4); + BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4); bs.setRanges(Collections.singleton(new Range())); try { allReplicationRefsReplicated = allReferencesReplicated(bs, tableId, wals); @@ -216,7 +216,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations { protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { TableOperations tops = conn.tableOperations(); - while (!tops.exists(ReplicationConstants.TABLE_NAME)) { + while (!tops.exists(ReplicationTable.NAME)) { UtilWaitThread.sleep(200); } @@ -232,7 +232,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations { } } - return new Text(strTableId); + return new Text(strTableId); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java index 2aef652..ee606b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java @@ -80,7 +80,7 @@ public class PrintReplicationRecords implements Runnable { out.println("--------------------------------------------------------------------"); try { - s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY); + s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY); } catch (TableNotFoundException e) { log.error("Replication table does not exist"); return; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java index a022362..b27aa43 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class ReplicaSystemHelper { private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class); @@ -48,7 +48,7 @@ public class ReplicaSystemHelper { /** * Record the updated Status for this file and target - * + * * @param filePath * Path to file being replicated * @param status @@ -56,11 +56,12 @@ public class ReplicaSystemHelper { * @param target * Peer that was replicated to */ - public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken()); - BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); try { - log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString (status)); + log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString(status)); Mutation m = new Mutation(filePath.toString()); WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status)); bw.addMutation(m); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java index 9815634..3d71681 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.replication; * */ public class ReplicationConstants { - public static final String TABLE_NAME = "replication"; // Constants for replication information in zookeeper public static final String ZOO_BASE = "/replication"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java new file mode 100644 index 0000000..2736762 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.replication; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +import com.google.common.collect.ImmutableMap; + +public class ReplicationTable { + + public static final String NAME = "replication"; + + public static final String COMBINER_NAME = "statuscombiner"; + + public static final String STATUS_LG_NAME = StatusSection.NAME.toString(); + public static final Set<Text> STATUS_LG_COLFAMS = Collections.singleton(StatusSection.NAME); + public static final String WORK_LG_NAME = WorkSection.NAME.toString(); + public static final Set<Text> WORK_LG_COLFAMS = Collections.singleton(WorkSection.NAME); + public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS); + public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName(); + + public static Scanner getScanner(Connector conn) throws TableNotFoundException { + return conn.createScanner(NAME, Authorizations.EMPTY); + } + + public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException { + return conn.createBatchWriter(NAME, new BatchWriterConfig()); + } + + public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException { + return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java index 7cc839f..54a9f8c 100644 --- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java @@ -67,14 +67,14 @@ public class ReplicationOperationsImplTest { @Test public void waitsUntilEntriesAreReplicated() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationConstants.TABLE_NAME); + conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo")); String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(file1); StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); @@ -136,7 +136,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId); bw.addMutation(m); @@ -163,7 +163,7 @@ public class ReplicationOperationsImplTest { @Test public void unrelatedReplicationRecordsDontBlockDrain() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationConstants.TABLE_NAME); + conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); conn.tableOperations().create("bar"); @@ -173,7 +173,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); @@ -227,7 +227,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId1); bw.addMutation(m); @@ -247,7 +247,7 @@ public class ReplicationOperationsImplTest { @Test public void inprogressReplicationRecordsBlockExecution() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationConstants.TABLE_NAME); + conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); @@ -255,7 +255,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); @@ -313,7 +313,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); m = new Mutation(file1); m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus)); bw.addMutation(m); @@ -333,7 +333,7 @@ public class ReplicationOperationsImplTest { @Test public void laterCreatedLogsDontBlockExecution() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationConstants.TABLE_NAME); + conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); @@ -341,7 +341,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); bw.addMutation(m); @@ -395,7 +395,7 @@ public class ReplicationOperationsImplTest { System.out.println(e.getKey()); } - bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new BatchWriterConfig()); + bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId1); bw.addMutation(m); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java deleted file mode 100644 index e7f10dc..0000000 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.replication; - -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.IteratorSetting.Column; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.iterators.Combiner; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; -import org.apache.accumulo.core.replication.ReplicationConstants; -import org.apache.accumulo.core.replication.StatusFormatter; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableMap; - -public class ReplicationTable { - private static final Logger log = LoggerFactory.getLogger(ReplicationTable.class); - - public static final String NAME = ReplicationConstants.TABLE_NAME; - - public static final String COMBINER_NAME = "statuscombiner"; - - public static final String STATUS_LG_NAME = StatusSection.NAME.toString(); - public static final Set<Text> STATUS_LG_COLFAMS = Collections.singleton(StatusSection.NAME); - public static final String WORK_LG_NAME = WorkSection.NAME.toString(); - public static final Set<Text> WORK_LG_COLFAMS = Collections.singleton(WorkSection.NAME); - public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS); - public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName(); - - public static synchronized void create(Connector conn) { - TableOperations tops = conn.tableOperations(); - if (tops.exists(NAME)) { - if (configureReplicationTable(conn)) { - return; - } - } - - for (int i = 0; i < 5; i++) { - try { - if (!tops.exists(NAME)) { - tops.create(NAME, false); - } - break; - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Failed to create replication table", e); - } catch (TableExistsException e) { - // Shouldn't happen unless someone else made the table - } - log.error("Retrying table creation in 1 second..."); - UtilWaitThread.sleep(1000); - } - - for (int i = 0; i < 5; i++) { - if (configureReplicationTable(conn)) { - return; - } - - log.error("Failed to configure the replication table, retying..."); - UtilWaitThread.sleep(1000); - } - - throw new RuntimeException("Could not configure replication table"); - } - - /** - * Attempts to configure the replication table, will return false if it fails - * - * @param conn - * Connector for the instance - * @return True if the replication table is properly configured - */ - protected static synchronized boolean configureReplicationTable(Connector conn) { - try { - conn.securityOperations().grantTablePermission("root", NAME, TablePermission.READ); - } catch (AccumuloException | AccumuloSecurityException e) { - log.warn("Could not grant root user read access to replication table", e); - // Should this be fatal? It's only for convenience, all r/w is done by !SYSTEM - } - - TableOperations tops = conn.tableOperations(); - Map<String,EnumSet<IteratorScope>> iterators = null; - try { - iterators = tops.listIterators(NAME); - } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { - log.error("Could not fetch iterators for " + NAME, e); - return false; - } - - if (!iterators.containsKey(COMBINER_NAME)) { - // Set our combiner and combine all columns - IteratorSetting setting = new IteratorSetting(30, COMBINER_NAME, StatusCombiner.class); - Combiner.setColumns(setting, Arrays.asList(new Column(StatusSection.NAME), new Column(WorkSection.NAME))); - try { - tops.attachIterator(NAME, setting); - } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { - log.error("Could not set StatusCombiner on replication table", e); - return false; - } - } - - Map<String,Set<Text>> localityGroups; - try { - localityGroups = tops.getLocalityGroups(NAME); - } catch (TableNotFoundException | AccumuloException e) { - log.error("Could not fetch locality groups", e); - return false; - } - - Set<Text> statusColfams = localityGroups.get(STATUS_LG_NAME), workColfams = localityGroups.get(WORK_LG_NAME); - if (null == statusColfams || null == workColfams) { - try { - tops.setLocalityGroups(NAME, LOCALITY_GROUPS); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - log.error("Could not set locality groups on replication table", e); - return false; - } - } - - // Make sure the StatusFormatter is set on the metadata table - Iterable<Entry<String,String>> properties; - try { - properties = tops.getProperties(NAME); - } catch (AccumuloException | TableNotFoundException e) { - log.error("Could not fetch table properties on replication table", e); - return false; - } - - boolean formatterConfigured = false; - for (Entry<String,String> property : properties) { - if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) { - if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) { - log.info("Changing formatter for {} table from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME); - try { - tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Could not set formatter on replication table", e); - return false; - } - } - - formatterConfigured = true; - - // Don't need to keep iterating over the properties after we found the one we were looking for - break; - } - } - - if (!formatterConfigured) { - try { - tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME); - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Could not set formatter on replication table", e); - return false; - } - } - - log.debug("Successfully configured replication table"); - - return true; - } - - public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException { - return conn.createScanner(NAME, auths); - } - - public static Scanner getScanner(Connector conn) throws TableNotFoundException { - return getScanner(conn, new Authorizations()); - } - - public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException { - return getBatchWriter(conn, new BatchWriterConfig()); - } - - public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException { - return conn.createBatchWriter(NAME, config); - } - - public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException { - return conn.createBatchScanner(NAME, new Authorizations(), queryThreads); - } - - public static boolean exists(Connector conn) { - return exists(conn.tableOperations()); - } - - public static boolean exists(TableOperations tops) { - return tops.exists(NAME); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java index 8e9612a..e05a08c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java @@ -18,7 +18,9 @@ package org.apache.accumulo.server.replication; import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -30,7 +32,10 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.replication.ReplicaSystem; @@ -39,15 +44,20 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Combiner; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -88,7 +98,9 @@ public class ReplicationUtil { /** * Extract replication peers from system configuration - * @param systemProperties System properties, typically from Connector.instanceOperations().getSystemConfiguration() + * + * @param systemProperties + * System properties, typically from Connector.instanceOperations().getSystemConfiguration() * @return Configured replication peers */ public Map<String,String> getPeers(Map<String,String> systemProperties) { @@ -99,11 +111,12 @@ public class ReplicationUtil { for (Entry<String,String> property : systemProperties.entrySet()) { String key = property.getKey(); // Filter out cruft that we don't want - if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) { + if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey()) + && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) { String peerName = property.getKey().substring(definedPeersPrefix.length()); ReplicaSystem replica; try { - replica = ReplicaSystemFactory.get(property.getValue()); + replica = ReplicaSystemFactory.get(property.getValue()); } catch (Exception e) { log.warn("Could not instantiate ReplicaSystem for {} with configuration {}", property.getKey(), property.getValue(), e); continue; @@ -162,7 +175,7 @@ public class ReplicationUtil { // Read over the queued work BatchScanner bs; try { - bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 4); + bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4); } catch (TableNotFoundException e) { log.debug("No replication table exists", e); return counts; @@ -194,9 +207,13 @@ public class ReplicationUtil { /** * Fetches the absolute path of the file to be replicated. - * @param conn Accumulo Connector - * @param workQueuePath Root path for the Replication WorkQueue - * @param queueKey The Replication work queue key + * + * @param conn + * Accumulo Connector + * @param workQueuePath + * Root path for the Replication WorkQueue + * @param queueKey + * The Replication work queue key * @return The absolute path for the file, or null if the key is no longer in ZooKeeper */ public String getAbsolutePath(Connector conn, String workQueuePath, String queueKey) { @@ -210,9 +227,13 @@ public class ReplicationUtil { /** * Compute a progress string for the replication of the given WAL - * @param conn Accumulo Connector - * @param path Absolute path to a WAL, or null - * @param target ReplicationTarget the WAL is being replicated to + * + * @param conn + * Accumulo Connector + * @param path + * Absolute path to a WAL, or null + * @param target + * ReplicationTarget the WAL is being replicated to * @return A status message for a file being replicated */ public String getProgress(Connector conn, String path, ReplicationTarget target) { @@ -222,7 +243,7 @@ public class ReplicationUtil { if (null != path) { Scanner s; try { - s = conn.createScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY); + s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY); } catch (TableNotFoundException e) { log.debug("Replication table no long exists", e); return status; @@ -236,8 +257,8 @@ public class ReplicationUtil { try { kv = Iterables.getOnlyElement(s); } catch (NoSuchElementException e) { - log.trace("Could not find status of {} replicating to {}", path, target); - status = "Unknown"; + log.trace("Could not find status of {} replicating to {}", path, target); + status = "Unknown"; } finally { s.close(); } @@ -267,9 +288,142 @@ public class ReplicationUtil { public Map<String,String> invert(Map<String,String> map) { Map<String,String> newMap = Maps.newHashMapWithExpectedSize(map.size()); - for(Entry<String,String> entry : map.entrySet()) { + for (Entry<String,String> entry : map.entrySet()) { newMap.put(entry.getValue(), entry.getKey()); } return newMap; } + + public static synchronized void createReplicationTable(Connector conn) { + TableOperations tops = conn.tableOperations(); + if (tops.exists(ReplicationTable.NAME)) { + if (configureReplicationTable(conn)) { + return; + } + } + + for (int i = 0; i < 5; i++) { + try { + if (!tops.exists(ReplicationTable.NAME)) { + tops.create(ReplicationTable.NAME, false); + } + break; + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Failed to create replication table", e); + } catch (TableExistsException e) { + // Shouldn't happen unless someone else made the table + } + log.error("Retrying table creation in 1 second..."); + UtilWaitThread.sleep(1000); + } + + for (int i = 0; i < 5; i++) { + if (configureReplicationTable(conn)) { + return; + } + + log.error("Failed to configure the replication table, retying..."); + UtilWaitThread.sleep(1000); + } + + throw new RuntimeException("Could not configure replication table"); + } + + /** + * Attempts to configure the replication table, will return false if it fails + * + * @param conn + * Connector for the instance + * @return True if the replication table is properly configured + */ + protected static synchronized boolean configureReplicationTable(Connector conn) { + try { + conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); + } catch (AccumuloException | AccumuloSecurityException e) { + log.warn("Could not grant root user read access to replication table", e); + // Should this be fatal? It's only for convenience, all r/w is done by !SYSTEM + } + + TableOperations tops = conn.tableOperations(); + Map<String,EnumSet<IteratorScope>> iterators = null; + try { + iterators = tops.listIterators(ReplicationTable.NAME); + } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { + log.error("Could not fetch iterators for " + ReplicationTable.NAME, e); + return false; + } + + if (!iterators.containsKey(ReplicationTable.COMBINER_NAME)) { + // Set our combiner and combine all columns + IteratorSetting setting = new IteratorSetting(30, ReplicationTable.COMBINER_NAME, StatusCombiner.class); + Combiner.setColumns(setting, Arrays.asList(new Column(StatusSection.NAME), new Column(WorkSection.NAME))); + try { + tops.attachIterator(ReplicationTable.NAME, setting); + } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { + log.error("Could not set StatusCombiner on replication table", e); + return false; + } + } + + Map<String,Set<Text>> localityGroups; + try { + localityGroups = tops.getLocalityGroups(ReplicationTable.NAME); + } catch (TableNotFoundException | AccumuloException e) { + log.error("Could not fetch locality groups", e); + return false; + } + + Set<Text> statusColfams = localityGroups.get(ReplicationTable.STATUS_LG_NAME), workColfams = localityGroups.get(ReplicationTable.WORK_LG_NAME); + if (null == statusColfams || null == workColfams) { + try { + tops.setLocalityGroups(ReplicationTable.NAME, ReplicationTable.LOCALITY_GROUPS); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + log.error("Could not set locality groups on replication table", e); + return false; + } + } + + // Make sure the StatusFormatter is set on the metadata table + Iterable<Entry<String,String>> properties; + try { + properties = tops.getProperties(ReplicationTable.NAME); + } catch (AccumuloException | TableNotFoundException e) { + log.error("Could not fetch table properties on replication table", e); + return false; + } + + boolean formatterConfigured = false; + for (Entry<String,String> property : properties) { + if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) { + if (!ReplicationTable.STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) { + log.info("Changing formatter for {} table from {} to {}", ReplicationTable.NAME, property.getValue(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); + try { + tops.setProperty(ReplicationTable.NAME, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Could not set formatter on replication table", e); + return false; + } + } + + formatterConfigured = true; + + // Don't need to keep iterating over the properties after we found the one we were looking for + break; + } + } + + if (!formatterConfigured) { + try { + tops.setProperty(ReplicationTable.NAME, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Could not set formatter on replication table", e); + return false; + } + } + + log.debug("Successfully configured replication table"); + + return true; + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java index bb80c81..44ac3fe 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.iterators.Combiner; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Before; @@ -62,7 +63,7 @@ public class ReplicationTableTest { public void replicationTableCreated() { TableOperations tops = conn.tableOperations(); Assert.assertFalse(tops.exists(ReplicationTable.NAME)); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); Assert.assertTrue(tops.exists(ReplicationTable.NAME)); } @@ -72,14 +73,14 @@ public class ReplicationTableTest { Assert.assertFalse(tops.exists(ReplicationTable.NAME)); // Create the table - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); // Make sure it exists and save off the id Assert.assertTrue(tops.exists(ReplicationTable.NAME)); String tableId = tops.tableIdMap().get(ReplicationTable.NAME); // Try to make it again, should return quickly - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); Assert.assertTrue(tops.exists(ReplicationTable.NAME)); // Verify we have the same table as previously @@ -96,7 +97,7 @@ public class ReplicationTableTest { Assert.assertFalse(iterators.containsKey(ReplicationTable.COMBINER_NAME)); - ReplicationTable.configureReplicationTable(conn); + ReplicationUtil.configureReplicationTable(conn); // After configure the iterator should be set iterators = tops.listIterators(ReplicationTable.NAME); @@ -123,7 +124,7 @@ public class ReplicationTableTest { tops.delete(ReplicationTable.NAME); } - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); Set<String> iters = tops.listIterators(ReplicationTable.NAME).keySet(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 9646be9..9928d3c 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; @@ -58,7 +59,6 @@ import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index f943ac1..bbb2010 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; @@ -87,7 +88,6 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.fs.VolumeUtil; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.security.SystemCredentials; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.Halt; @@ -756,7 +756,7 @@ public class SimpleGarbageCollector implements Iface { HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { - return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, + return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, SslConnectionParams.forServer(config), 0).address; } catch (Exception ex) { log.fatal(ex, ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index 74da72d..d13edf7 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -38,12 +38,12 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.hadoop.fs.Path; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index 74b0919..cd69efb 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -50,11 +50,12 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -116,12 +117,14 @@ public class GarbageCollectWriteAheadLogsTest { @Test public void testMapServersToFiles() { + // @formatter:off /* * Test fileToServerMap: * /dir1/server1/uuid1 -> server1 (new-style) * /dir1/uuid2 -> "" (old-style) * /dir3/server3/uuid3 -> server3 (new-style) */ + // @formatter:on Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>(); Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1); fileToServerMap.put(path1, "server1"); // new-style @@ -129,20 +132,24 @@ public class GarbageCollectWriteAheadLogsTest { fileToServerMap.put(path2, ""); // old-style Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3); fileToServerMap.put(path3, "server3"); // old-style + // @formatter:off /* * Test nameToFileMap: * uuid1 -> /dir1/server1/uuid1 * uuid3 -> /dir3/server3/uuid3 */ + // @formatter:on Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>(); nameToFileMap.put(UUID1, path1); nameToFileMap.put(UUID3, path3); - /** + // @formatter:off + /* * Expected map: * server1 -> [ /dir1/server1/uuid1 ] * server3 -> [ /dir3/server3/uuid3 ] */ + // @formatter:on Map<String,ArrayList<Path>> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap); assertEquals(2, result.size()); ArrayList<Path> list1 = result.get("server1"); @@ -157,6 +164,7 @@ public class GarbageCollectWriteAheadLogsTest { boolean isDir = (size == 0); return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path); } + private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception { expect(volMgr.listStatus(dir)).andReturn(fileStatuses); } @@ -164,6 +172,7 @@ public class GarbageCollectWriteAheadLogsTest { @Test public void testScanServers_NewStyle() throws Exception { String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"}; + // @formatter:off /* * Test directory layout: * /dir1/ @@ -176,6 +185,7 @@ public class GarbageCollectWriteAheadLogsTest { * server3/ * uuid3 */ + // @formatter:on Path serverDir1Path = new Path(DIR_1_PATH, "server1"); FileStatus serverDir1 = makeFileStatus(0, serverDir1Path); Path subDir2Path = new Path(DIR_1_PATH, "subdir2"); @@ -199,19 +209,23 @@ public class GarbageCollectWriteAheadLogsTest { Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>(); int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap); assertEquals(3, count); + // @formatter:off /* * Expected fileToServerMap: * /dir1/server1/uuid1 -> server1 * /dir3/server3/uuid3 -> server3 */ + // @formatter:on assertEquals(2, fileToServerMap.size()); assertEquals("server1", fileToServerMap.get(path1)); assertEquals("server3", fileToServerMap.get(path3)); + // @formatter:off /* * Expected nameToFileMap: * uuid1 -> /dir1/server1/uuid1 * uuid3 -> /dir3/server3/uuid3 */ + // @formatter:on assertEquals(2, nameToFileMap.size()); assertEquals(path1, nameToFileMap.get(UUID1)); assertEquals(path3, nameToFileMap.get(UUID3)); @@ -219,6 +233,7 @@ public class GarbageCollectWriteAheadLogsTest { @Test public void testScanServers_OldStyle() throws Exception { + // @formatter:off /* * Test directory layout: * /dir1/ @@ -226,6 +241,7 @@ public class GarbageCollectWriteAheadLogsTest { * /dir3/ * uuid3 */ + // @formatter:on String[] walDirs = new String[] {"/dir1", "/dir3"}; Path serverFile1Path = new Path(DIR_1_PATH, UUID1); FileStatus serverFile1 = makeFileStatus(100, serverFile1Path); @@ -242,19 +258,23 @@ public class GarbageCollectWriteAheadLogsTest { * Expect only a single server, the non-server entry for upgrade WALs */ assertEquals(1, count); + // @formatter:off /* * Expected fileToServerMap: * /dir1/uuid1 -> "" * /dir3/uuid3 -> "" */ + // @formatter:on assertEquals(2, fileToServerMap.size()); assertEquals("", fileToServerMap.get(serverFile1Path)); assertEquals("", fileToServerMap.get(serverFile3Path)); + // @formatter:off /* * Expected nameToFileMap: * uuid1 -> /dir1/uuid1 * uuid3 -> /dir3/uuid3 */ + // @formatter:on assertEquals(2, nameToFileMap.size()); assertEquals(serverFile1Path, nameToFileMap.get(UUID1)); assertEquals(serverFile3Path, nameToFileMap.get(UUID3)); @@ -263,6 +283,7 @@ public class GarbageCollectWriteAheadLogsTest { @Test public void testGetSortedWALogs() throws Exception { String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"}; + // @formatter:off /* * Test directory layout: * /dir1/ @@ -272,6 +293,7 @@ public class GarbageCollectWriteAheadLogsTest { * /dir3/ * uuid3 */ + // @formatter:on expect(volMgr.exists(DIR_1_PATH)).andReturn(true); expect(volMgr.exists(DIR_2_PATH)).andReturn(false); expect(volMgr.exists(DIR_3_PATH)).andReturn(true); @@ -285,11 +307,13 @@ public class GarbageCollectWriteAheadLogsTest { replay(volMgr); Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs); - /** + // @formatter:off + /* * Expected map: * uuid1 -> /dir1/uuid1 * uuid3 -> /dir3/uuid3 */ + // @formatter:on assertEquals(2, sortedWalogs.size()); assertEquals(path1, sortedWalogs.get(UUID1)); assertEquals(path3, sortedWalogs.get(UUID3)); @@ -360,7 +384,7 @@ public class GarbageCollectWriteAheadLogsTest { GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); long file1CreateTime = System.currentTimeMillis(); long file2CreateTime = file1CreateTime + 50; @@ -404,7 +428,7 @@ public class GarbageCollectWriteAheadLogsTest { GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); long file1CreateTime = System.currentTimeMillis(); long file2CreateTime = file1CreateTime + 50; @@ -464,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest { public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { Instance inst = new MockInstance(testName.getMethodName()); Connector conn = inst.getConnector("root", new PasswordToken("")); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); long walCreateTime = System.currentTimeMillis(); String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index 13d2263..4d99495 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -47,12 +47,13 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.hadoop.io.Text; import org.easymock.IAnswer; import org.junit.Assert; @@ -308,7 +309,7 @@ public class CloseWriteAheadLogReferencesTest { Instance inst = new MockInstance(testName.getMethodName()); Connector conn = inst.getConnector("root", new PasswordToken("")); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wal/tserver+port/12345"); m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())); @@ -330,7 +331,7 @@ public class CloseWriteAheadLogReferencesTest { Instance inst = new MockInstance(testName.getMethodName()); Connector conn = inst.getConnector("root", new PasswordToken("")); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file); m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())); @@ -352,7 +353,7 @@ public class CloseWriteAheadLogReferencesTest { Instance inst = new MockInstance(testName.getMethodName()); Connector conn = inst.getConnector("root", new PasswordToken("")); - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); Mutation m = new Mutation(file); StatusSection.add(m, new Text("1"), ProtobufUtil.toValue(StatusUtil.ingestedUntil(1000))); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java index 06a653d..d4a9af2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.MasterClient; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; -import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.metrics.AbstractMetricsImpl; @@ -61,7 +61,7 @@ public class ReplicationMetrics extends AbstractMetricsImpl implements Replicati @Override public int getNumFilesPendingReplication() { - if (!tops.exists(ReplicationConstants.TABLE_NAME)) { + if (!tops.exists(ReplicationTable.NAME)) { return 0; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java index 3810372..8e1d036 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java @@ -32,13 +32,13 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.replication.WorkAssigner; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; @@ -252,14 +252,18 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner { /** * Queue the given work for the target - * @param path File to replicate - * @param target Target for the work + * + * @param path + * File to replicate + * @param target + * Target for the work * @return True if the work was queued, false otherwise */ protected abstract boolean queueWork(Path path, ReplicationTarget target); /** - * @param target Target for the work + * @param target + * Target for the work * @return Queued work for the given target */ protected abstract Set<String> getQueuedWork(ReplicationTarget target); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java index b816eab..8bc18eb 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java @@ -37,10 +37,10 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.InvalidProtocolBufferException; /** - * Update the status record in the replication table with work - * that has been replicated to each configured peer. + * Update the status record in the replication table with work that has been replicated to each configured peer. */ public class FinishedWorkUpdater implements Runnable { private static final Logger log = LoggerFactory.getLogger(FinishedWorkUpdater.class); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java index c670d51..ef76916 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java @@ -39,10 +39,10 @@ import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,7 +182,7 @@ public class RemoveCompleteReplicationRecords implements Runnable { Long timeClosed = tableToTimeCreated.get(tableId); if (null == timeClosed) { tableToTimeCreated.put(tableId, status.getCreatedTime()); - } else if (timeClosed != status.getCreatedTime()){ + } else if (timeClosed != status.getCreatedTime()) { log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed, status.getCreatedTime()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java index 68dd005..c0f2a6c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java @@ -33,9 +33,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.hadoop.io.Text; @@ -45,8 +46,8 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.InvalidProtocolBufferException; /** - * Reads replication records from the metadata table and creates status records in the replication table. Deletes the record - * from the metadata table when it's closed. + * Reads replication records from the metadata table and creates status records in the replication table. Deletes the record from the metadata table when it's + * closed. */ public class StatusMaker { private static final Logger log = LoggerFactory.getLogger(StatusMaker.class); @@ -64,7 +65,7 @@ public class StatusMaker { * Not for public use -- visible only for testing * <p> * Used to read records from a table other than 'metadata' - * + * * @param table * The table to read from */ @@ -92,7 +93,7 @@ public class StatusMaker { // Get a writer to the replication table if (null == replicationWriter) { // Ensures table exists and is properly configured - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); try { setBatchWriter(ReplicationTable.getBatchWriter(conn)); } catch (TableNotFoundException e) { @@ -105,7 +106,6 @@ public class StatusMaker { MetadataSchema.ReplicationSection.getFile(entry.getKey(), file); MetadataSchema.ReplicationSection.getTableId(entry.getKey(), tableId); - Status status; try { status = Status.parseFrom(entry.getValue().get()); @@ -180,13 +180,17 @@ public class StatusMaker { } /** - * Create a record to track when the file was closed to ensure that replication preference - * is given to files that have been closed the longest and allow the work assigner to try to - * replicate in order that data was ingested (avoid replay in different order) - * @param file File being replicated - * @param tableId Table ID the file was used by - * @param stat Status msg - * @param value Serialized version of the Status msg + * Create a record to track when the file was closed to ensure that replication preference is given to files that have been closed the longest and allow the + * work assigner to try to replicate in order that data was ingested (avoid replay in different order) + * + * @param file + * File being replicated + * @param tableId + * Table ID the file was used by + * @param stat + * Status msg + * @param value + * Serialized version of the Status msg */ protected boolean addOrderRecord(Text file, Text tableId, Status stat, Value value) { try { @@ -219,15 +223,14 @@ public class StatusMaker { } /** - * Because there is only one active Master, and thus one active StatusMaker, the only - * safe time that we can issue the delete for a Status which is closed is immediately - * after writing it to the replication table. + * Because there is only one active Master, and thus one active StatusMaker, the only safe time that we can issue the delete for a Status which is closed is + * immediately after writing it to the replication table. * <p> - * If we try to defer and delete these entries in another thread/process, we will have - * no assurance that the Status message was propagated to the replication table. It is - * easiest, in terms of concurrency, to do this all in one step. - * - * @param k The Key to delete + * If we try to defer and delete these entries in another thread/process, we will have no assurance that the Status message was propagated to the replication + * table. It is easiest, in terms of concurrency, to do this all in one step. + * + * @param k + * The Key to delete */ protected void deleteStatusRecord(Key k) { log.debug("Deleting {} from metadata table as it's no longer needed", k.toStringNoTruncate()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index eabcc84..95f607f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -33,12 +33,12 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.replication.ReplicationSchema; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.hadoop.io.DataOutputBuffer; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java index 7633823..f8cc775 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java @@ -31,9 +31,10 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -43,7 +44,7 @@ import org.junit.rules.TestName; import com.google.common.collect.Iterables; /** - * + * */ public class FinishedWorkUpdaterTest { @@ -67,7 +68,7 @@ public class FinishedWorkUpdaterTest { @Test public void recordsWithProgressUpdateBothTables() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build(); @@ -97,15 +98,17 @@ public class FinishedWorkUpdaterTest { @Test public void chooseMinimumBeginOffset() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + // @formatter:off Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(), stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(), stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(); ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"), target2 = new ReplicationTarget("peer2", "table2", "1"), target3 = new ReplicationTarget("peer3", "table3", "1"); + // @formatter:on // Create a single work record for a file to some peer BatchWriter bw = ReplicationTable.getBatchWriter(conn); @@ -133,15 +136,17 @@ public class FinishedWorkUpdaterTest { @Test public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); + // @formatter:off Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(), stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(), stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(); ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"), target2 = new ReplicationTarget("peer2", "table2", "1"), target3 = new ReplicationTarget("peer3", "table3", "1"); + // @formatter:on // Create a single work record for a file to some peer BatchWriter bw = ReplicationTable.getBatchWriter(conn); http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java index ec19a94..2255e21 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java @@ -37,10 +37,11 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.hadoop.io.Text; import org.easymock.EasyMock; import org.junit.Assert; @@ -72,7 +73,7 @@ public class RemoveCompleteReplicationRecordsTest { @Test public void notYetReplicationRecordsIgnored() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter bw = ReplicationTable.getBatchWriter(conn); int numRecords = 3; for (int i = 0; i < numRecords; i++) { @@ -102,7 +103,7 @@ public class RemoveCompleteReplicationRecordsTest { @Test public void partiallyReplicatedRecordsIgnored() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter bw = ReplicationTable.getBatchWriter(conn); int numRecords = 3; Status.Builder builder = Status.newBuilder(); @@ -137,7 +138,7 @@ public class RemoveCompleteReplicationRecordsTest { @Test public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter replBw = ReplicationTable.getBatchWriter(conn); int numRecords = 3; @@ -190,7 +191,7 @@ public class RemoveCompleteReplicationRecordsTest { @Test public void replicatedClosedRowsAreRemoved() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter replBw = ReplicationTable.getBatchWriter(conn); int numRecords = 3; @@ -282,7 +283,7 @@ public class RemoveCompleteReplicationRecordsTest { @Test public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception { - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); BatchWriter replBw = ReplicationTable.getBatchWriter(conn); int numRecords = 3; http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java index 3d96ea1..d5bcfbd 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java @@ -37,13 +37,14 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper; -import org.apache.accumulo.server.replication.ReplicationTable; +import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.io.Text; @@ -85,7 +86,7 @@ public class SequentialWorkAssignerTest { assigner.setConnector(conn); // Create and grant ourselves write to the replication table - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); // Create two mutations, both of which need replication work done @@ -158,7 +159,7 @@ public class SequentialWorkAssignerTest { assigner.setConnector(conn); // Create and grant ourselves write to the replication table - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); // Create two mutations, both of which need replication work done @@ -238,7 +239,7 @@ public class SequentialWorkAssignerTest { assigner.setConnector(conn); // Create and grant ourselves write to the replication table - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); // Create two mutations, both of which need replication work done @@ -359,7 +360,7 @@ public class SequentialWorkAssignerTest { assigner.setConnector(conn); // Create and grant ourselves write to the replication table - ReplicationTable.create(conn); + ReplicationUtil.createReplicationTable(conn); conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE); // Create two mutations, both of which need replication work done http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java index 966e2bd..6a8fa4d 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java @@ -38,11 +38,11 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; -import org.apache.accumulo.server.replication.ReplicationTable; import org.apache.accumulo.server.util.ReplicationTableUtil; import org.apache.hadoop.io.Text; import org.junit.Assert;