ACCUMULO-3147 Refactor ReplicationTable constants

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f1a47b38
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f1a47b38
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f1a47b38

Branch: refs/heads/master
Commit: f1a47b38d3f5338a9ad1104cd3009b327e08c371
Parents: cc48e37
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Wed Oct 29 15:11:48 2014 -0400
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Thu Nov 6 18:39:54 2014 -0500

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  |   8 +-
 .../replication/PrintReplicationRecords.java    |   2 +-
 .../core/replication/ReplicaSystemHelper.java   |  11 +-
 .../core/replication/ReplicationConstants.java  |   1 -
 .../core/replication/ReplicationTable.java      |  61 +++++
 .../ReplicationOperationsImplTest.java          |  24 +-
 .../server/replication/ReplicationTable.java    | 227 -------------------
 .../server/replication/ReplicationUtil.java     | 184 +++++++++++++--
 .../replication/ReplicationTableTest.java       |  11 +-
 .../gc/GarbageCollectWriteAheadLogs.java        |   2 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |   4 +-
 .../CloseWriteAheadLogReferences.java           |   2 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  36 ++-
 .../CloseWriteAheadLogReferencesTest.java       |   9 +-
 .../master/metrics/ReplicationMetrics.java      |   4 +-
 .../DistributedWorkQueueWorkAssigner.java       |  12 +-
 .../master/replication/FinishedWorkUpdater.java |   5 +-
 .../RemoveCompleteReplicationRecords.java       |   4 +-
 .../master/replication/StatusMaker.java         |  45 ++--
 .../accumulo/master/replication/WorkMaker.java  |   2 +-
 .../replication/FinishedWorkUpdaterTest.java    |  15 +-
 .../RemoveCompleteReplicationRecordsTest.java   |  13 +-
 .../replication/SequentialWorkAssignerTest.java |  11 +-
 .../master/replication/StatusMakerTest.java     |   2 +-
 .../replication/UnorderedWorkAssignerTest.java  |  16 +-
 .../master/replication/WorkMakerTest.java       |   2 +-
 .../monitor/servlets/ReplicationServlet.java    |   3 +-
 .../replication/ReplicationProcessor.java       |   2 +-
 .../replication/MultiInstanceReplicationIT.java |   6 +-
 .../test/replication/ReplicationIT.java         |  18 +-
 .../test/replication/StatusCombinerMacTest.java |   5 +-
 .../UnorderedWorkAssignerReplicationIT.java     |   7 +-
 32 files changed, 393 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index f820aa4..a8f3f21 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -44,8 +44,8 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -152,7 +152,7 @@ public class ReplicationOperationsImpl implements 
ReplicationOperations {
     log.info("reading from replication table");
     boolean allReplicationRefsReplicated = false;
     while (!allReplicationRefsReplicated) {
-      BatchScanner bs = 
conn.createBatchScanner(ReplicationConstants.TABLE_NAME, Authorizations.EMPTY, 
4);
+      BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, 
Authorizations.EMPTY, 4);
       bs.setRanges(Collections.singleton(new Range()));
       try {
         allReplicationRefsReplicated = allReferencesReplicated(bs, tableId, 
wals);
@@ -216,7 +216,7 @@ public class ReplicationOperationsImpl implements 
ReplicationOperations {
 
   protected Text getTableId(Connector conn, String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
     TableOperations tops = conn.tableOperations();
-    while (!tops.exists(ReplicationConstants.TABLE_NAME)) {
+    while (!tops.exists(ReplicationTable.NAME)) {
       UtilWaitThread.sleep(200);
     }
 
@@ -232,7 +232,7 @@ public class ReplicationOperationsImpl implements 
ReplicationOperations {
       }
     }
 
-    return new Text(strTableId);    
+    return new Text(strTableId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index 2aef652..ee606b5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -80,7 +80,7 @@ public class PrintReplicationRecords implements Runnable {
     
out.println("--------------------------------------------------------------------");
 
     try {
-      s = conn.createScanner(ReplicationConstants.TABLE_NAME, 
Authorizations.EMPTY);
+      s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
     } catch (TableNotFoundException e) {
       log.error("Replication table does not exist");
       return;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
index a022362..b27aa43 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class ReplicaSystemHelper {
   private static final Logger log = 
LoggerFactory.getLogger(ReplicaSystemHelper.class);
@@ -48,7 +48,7 @@ public class ReplicaSystemHelper {
 
   /**
    * Record the updated Status for this file and target
-   * 
+   *
    * @param filePath
    *          Path to file being replicated
    * @param status
@@ -56,11 +56,12 @@ public class ReplicaSystemHelper {
    * @param target
    *          Peer that was replicated to
    */
-  public void recordNewStatus(Path filePath, Status status, ReplicationTarget 
target) throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
+  public void recordNewStatus(Path filePath, Status status, ReplicationTarget 
target) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException {
     Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-    BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, 
new BatchWriterConfig());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     try {
-      log.debug("Recording new status for {}, {}", filePath.toString(), 
ProtobufUtil.toString (status));
+      log.debug("Recording new status for {}, {}", filePath.toString(), 
ProtobufUtil.toString(status));
       Mutation m = new Mutation(filePath.toString());
       WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
       bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
index 9815634..3d71681 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationConstants.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.core.replication;
  * 
  */
 public class ReplicationConstants {
-  public static final String TABLE_NAME = "replication";
 
   // Constants for replication information in zookeeper
   public static final String ZOO_BASE = "/replication";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
new file mode 100644
index 0000000..2736762
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ReplicationTable {
+
+  public static final String NAME = "replication";
+
+  public static final String COMBINER_NAME = "statuscombiner";
+
+  public static final String STATUS_LG_NAME = StatusSection.NAME.toString();
+  public static final Set<Text> STATUS_LG_COLFAMS = 
Collections.singleton(StatusSection.NAME);
+  public static final String WORK_LG_NAME = WorkSection.NAME.toString();
+  public static final Set<Text> WORK_LG_COLFAMS = 
Collections.singleton(WorkSection.NAME);
+  public static final Map<String,Set<Text>> LOCALITY_GROUPS = 
ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, 
WORK_LG_COLFAMS);
+  public static final String STATUS_FORMATTER_CLASS_NAME = 
StatusFormatter.class.getName();
+
+  public static Scanner getScanner(Connector conn) throws 
TableNotFoundException {
+    return conn.createScanner(NAME, Authorizations.EMPTY);
+  }
+
+  public static BatchWriter getBatchWriter(Connector conn) throws 
TableNotFoundException {
+    return conn.createBatchWriter(NAME, new BatchWriterConfig());
+  }
+
+  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) 
throws TableNotFoundException {
+    return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
index 7cc839f..54a9f8c 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java
@@ -67,14 +67,14 @@ public class ReplicationOperationsImplTest {
   @Test
   public void waitsUntilEntriesAreReplicated() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
+    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
     Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
 
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = 
"/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, 
new BatchWriterConfig());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
@@ -136,7 +136,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new 
BatchWriterConfig());
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId);
     bw.addMutation(m);
@@ -163,7 +163,7 @@ public class ReplicationOperationsImplTest {
   @Test
   public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
+    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
     conn.tableOperations().create("bar");
 
@@ -173,7 +173,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = 
"/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, 
new BatchWriterConfig());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -227,7 +227,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new 
BatchWriterConfig());
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId1);
     bw.addMutation(m);
@@ -247,7 +247,7 @@ public class ReplicationOperationsImplTest {
   @Test
   public void inprogressReplicationRecordsBlockExecution() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
+    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
 
     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -255,7 +255,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, 
new BatchWriterConfig());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
 
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
@@ -313,7 +313,7 @@ public class ReplicationOperationsImplTest {
     Assert.assertFalse(done.get());
 
     // Remove the replication entries too
-    bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new 
BatchWriterConfig());
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     m = new Mutation(file1);
     m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
     bw.addMutation(m);
@@ -333,7 +333,7 @@ public class ReplicationOperationsImplTest {
   @Test
   public void laterCreatedLogsDontBlockExecution() throws Exception {
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    conn.tableOperations().create(ReplicationConstants.TABLE_NAME);
+    conn.tableOperations().create(ReplicationTable.NAME);
     conn.tableOperations().create("foo");
 
     Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
@@ -341,7 +341,7 @@ public class ReplicationOperationsImplTest {
     String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
 
-    BatchWriter bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, 
new BatchWriterConfig());
+    BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     Mutation m = new Mutation(file1);
     StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
     bw.addMutation(m);
@@ -395,7 +395,7 @@ public class ReplicationOperationsImplTest {
       System.out.println(e.getKey());
     }
 
-    bw = conn.createBatchWriter(ReplicationConstants.TABLE_NAME, new 
BatchWriterConfig());
+    bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     m = new Mutation(file1);
     m.putDelete(StatusSection.NAME, tableId1);
     bw.addMutation(m);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
 
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
deleted file mode 100644
index e7f10dc..0000000
--- 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.replication;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.IteratorSetting.Column;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.iterators.Combiner;
-import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.StatusFormatter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-
-public class ReplicationTable {
-  private static final Logger log = 
LoggerFactory.getLogger(ReplicationTable.class);
-
-  public static final String NAME = ReplicationConstants.TABLE_NAME;
-
-  public static final String COMBINER_NAME = "statuscombiner";
-
-  public static final String STATUS_LG_NAME = StatusSection.NAME.toString();
-  public static final Set<Text> STATUS_LG_COLFAMS = 
Collections.singleton(StatusSection.NAME);
-  public static final String WORK_LG_NAME = WorkSection.NAME.toString();
-  public static final Set<Text> WORK_LG_COLFAMS = 
Collections.singleton(WorkSection.NAME);
-  public static final Map<String,Set<Text>> LOCALITY_GROUPS = 
ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, 
WORK_LG_COLFAMS);
-  public static final String STATUS_FORMATTER_CLASS_NAME = 
StatusFormatter.class.getName();
-
-  public static synchronized void create(Connector conn) {
-    TableOperations tops = conn.tableOperations();
-    if (tops.exists(NAME)) {
-      if (configureReplicationTable(conn)) {
-        return;
-      }
-    }
-
-    for (int i = 0; i < 5; i++) {
-      try {
-        if (!tops.exists(NAME)) {
-          tops.create(NAME, false);
-        }
-        break;
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        log.error("Failed to create replication table", e);
-      } catch (TableExistsException e) {
-        // Shouldn't happen unless someone else made the table
-      }
-      log.error("Retrying table creation in 1 second...");
-      UtilWaitThread.sleep(1000);
-    }
-
-    for (int i = 0; i < 5; i++) {
-      if (configureReplicationTable(conn)) {
-        return;
-      }
-
-      log.error("Failed to configure the replication table, retying...");
-      UtilWaitThread.sleep(1000);
-    }
-
-    throw new RuntimeException("Could not configure replication table");
-  }
-
-  /**
-   * Attempts to configure the replication table, will return false if it fails
-   * 
-   * @param conn
-   *          Connector for the instance
-   * @return True if the replication table is properly configured
-   */
-  protected static synchronized boolean configureReplicationTable(Connector 
conn) {
-    try {
-      conn.securityOperations().grantTablePermission("root", NAME, 
TablePermission.READ);
-    } catch (AccumuloException | AccumuloSecurityException e) {
-      log.warn("Could not grant root user read access to replication table", 
e);
-      // Should this be fatal? It's only for convenience, all r/w is done by 
!SYSTEM
-    }
-
-    TableOperations tops = conn.tableOperations();
-    Map<String,EnumSet<IteratorScope>> iterators = null;
-    try {
-      iterators = tops.listIterators(NAME);
-    } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
-      log.error("Could not fetch iterators for " + NAME, e);
-      return false;
-    }
-
-    if (!iterators.containsKey(COMBINER_NAME)) {
-      // Set our combiner and combine all columns
-      IteratorSetting setting = new IteratorSetting(30, COMBINER_NAME, 
StatusCombiner.class);
-      Combiner.setColumns(setting, Arrays.asList(new 
Column(StatusSection.NAME), new Column(WorkSection.NAME)));
-      try {
-        tops.attachIterator(NAME, setting);
-      } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
-        log.error("Could not set StatusCombiner on replication table", e);
-        return false;
-      }
-    }
-
-    Map<String,Set<Text>> localityGroups;
-    try {
-      localityGroups = tops.getLocalityGroups(NAME);
-    } catch (TableNotFoundException | AccumuloException e) {
-      log.error("Could not fetch locality groups", e);
-      return false;
-    }
-
-    Set<Text> statusColfams = localityGroups.get(STATUS_LG_NAME), workColfams 
= localityGroups.get(WORK_LG_NAME);
-    if (null == statusColfams || null == workColfams) {
-      try {
-        tops.setLocalityGroups(NAME, LOCALITY_GROUPS);
-      } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
-        log.error("Could not set locality groups on replication table", e);
-        return false;
-      }
-    }
-
-    // Make sure the StatusFormatter is set on the metadata table
-    Iterable<Entry<String,String>> properties;
-    try {
-      properties = tops.getProperties(NAME);
-    } catch (AccumuloException | TableNotFoundException e) {
-      log.error("Could not fetch table properties on replication table", e);
-      return false;
-    }
-
-    boolean formatterConfigured = false;
-    for (Entry<String,String> property : properties) {
-      if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
-        if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
-          log.info("Changing formatter for {} table from {} to {}", NAME, 
property.getValue(), STATUS_FORMATTER_CLASS_NAME);
-          try {
-            tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), 
STATUS_FORMATTER_CLASS_NAME);
-          } catch (AccumuloException | AccumuloSecurityException e) {
-            log.error("Could not set formatter on replication table", e);
-            return false;
-          }
-        }
-
-        formatterConfigured = true;
-
-        // Don't need to keep iterating over the properties after we found the 
one we were looking for
-        break;
-      }
-    }
-
-    if (!formatterConfigured) {
-      try {
-        tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), 
STATUS_FORMATTER_CLASS_NAME);
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        log.error("Could not set formatter on replication table", e);
-        return false;
-      }
-    }
-
-    log.debug("Successfully configured replication table");
-
-    return true;
-  }
-
-  public static Scanner getScanner(Connector conn, Authorizations auths) 
throws TableNotFoundException {
-    return conn.createScanner(NAME, auths);
-  }
-
-  public static Scanner getScanner(Connector conn) throws 
TableNotFoundException {
-    return getScanner(conn, new Authorizations());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn) throws 
TableNotFoundException {
-    return getBatchWriter(conn, new BatchWriterConfig());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig 
config) throws TableNotFoundException {
-    return conn.createBatchWriter(NAME, config);
-  }
-
-  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) 
throws TableNotFoundException {
-    return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
-  }
-
-  public static boolean exists(Connector conn) {
-    return exists(conn.tableOperations());
-  }
-
-  public static boolean exists(TableOperations tops) {
-    return tops.exists(NAME);
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
index 8e9612a..e05a08c 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java
@@ -18,7 +18,9 @@ package org.apache.accumulo.server.replication;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -30,7 +32,10 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
@@ -39,15 +44,20 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Combiner;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -88,7 +98,9 @@ public class ReplicationUtil {
 
   /**
    * Extract replication peers from system configuration
-   * @param systemProperties System properties, typically from 
Connector.instanceOperations().getSystemConfiguration()
+   *
+   * @param systemProperties
+   *          System properties, typically from 
Connector.instanceOperations().getSystemConfiguration()
    * @return Configured replication peers
    */
   public Map<String,String> getPeers(Map<String,String> systemProperties) {
@@ -99,11 +111,12 @@ public class ReplicationUtil {
     for (Entry<String,String> property : systemProperties.entrySet()) {
       String key = property.getKey();
       // Filter out cruft that we don't want
-      if (key.startsWith(definedPeersPrefix) && 
!key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && 
!key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
+      if (key.startsWith(definedPeersPrefix) && 
!key.startsWith(Property.REPLICATION_PEER_USER.getKey())
+          && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
         String peerName = 
property.getKey().substring(definedPeersPrefix.length());
         ReplicaSystem replica;
         try {
-         replica = ReplicaSystemFactory.get(property.getValue());
+          replica = ReplicaSystemFactory.get(property.getValue());
         } catch (Exception e) {
           log.warn("Could not instantiate ReplicaSystem for {} with 
configuration {}", property.getKey(), property.getValue(), e);
           continue;
@@ -162,7 +175,7 @@ public class ReplicationUtil {
     // Read over the queued work
     BatchScanner bs;
     try {
-      bs = conn.createBatchScanner(ReplicationConstants.TABLE_NAME, 
Authorizations.EMPTY, 4);
+      bs = conn.createBatchScanner(ReplicationTable.NAME, 
Authorizations.EMPTY, 4);
     } catch (TableNotFoundException e) {
       log.debug("No replication table exists", e);
       return counts;
@@ -194,9 +207,13 @@ public class ReplicationUtil {
 
   /**
    * Fetches the absolute path of the file to be replicated.
-   * @param conn Accumulo Connector
-   * @param workQueuePath Root path for the Replication WorkQueue
-   * @param queueKey The Replication work queue key
+   *
+   * @param conn
+   *          Accumulo Connector
+   * @param workQueuePath
+   *          Root path for the Replication WorkQueue
+   * @param queueKey
+   *          The Replication work queue key
    * @return The absolute path for the file, or null if the key is no longer 
in ZooKeeper
    */
   public String getAbsolutePath(Connector conn, String workQueuePath, String 
queueKey) {
@@ -210,9 +227,13 @@ public class ReplicationUtil {
 
   /**
    * Compute a progress string for the replication of the given WAL
-   * @param conn Accumulo Connector
-   * @param path Absolute path to a WAL, or null
-   * @param target ReplicationTarget the WAL is being replicated to
+   *
+   * @param conn
+   *          Accumulo Connector
+   * @param path
+   *          Absolute path to a WAL, or null
+   * @param target
+   *          ReplicationTarget the WAL is being replicated to
    * @return A status message for a file being replicated
    */
   public String getProgress(Connector conn, String path, ReplicationTarget 
target) {
@@ -222,7 +243,7 @@ public class ReplicationUtil {
     if (null != path) {
       Scanner s;
       try {
-        s = conn.createScanner(ReplicationConstants.TABLE_NAME, 
Authorizations.EMPTY);
+        s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
       } catch (TableNotFoundException e) {
         log.debug("Replication table no long exists", e);
         return status;
@@ -236,8 +257,8 @@ public class ReplicationUtil {
       try {
         kv = Iterables.getOnlyElement(s);
       } catch (NoSuchElementException e) {
-       log.trace("Could not find status of {} replicating to {}", path, 
target);
-       status = "Unknown";
+        log.trace("Could not find status of {} replicating to {}", path, 
target);
+        status = "Unknown";
       } finally {
         s.close();
       }
@@ -267,9 +288,142 @@ public class ReplicationUtil {
 
   public Map<String,String> invert(Map<String,String> map) {
     Map<String,String> newMap = Maps.newHashMapWithExpectedSize(map.size());
-    for(Entry<String,String> entry : map.entrySet()) {
+    for (Entry<String,String> entry : map.entrySet()) {
       newMap.put(entry.getValue(), entry.getKey());
     }
     return newMap;
   }
+
+  public static synchronized void createReplicationTable(Connector conn) {
+    TableOperations tops = conn.tableOperations();
+    if (tops.exists(ReplicationTable.NAME)) {
+      if (configureReplicationTable(conn)) {
+        return;
+      }
+    }
+
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (!tops.exists(ReplicationTable.NAME)) {
+          tops.create(ReplicationTable.NAME, false);
+        }
+        break;
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        log.error("Failed to create replication table", e);
+      } catch (TableExistsException e) {
+        // Shouldn't happen unless someone else made the table
+      }
+      log.error("Retrying table creation in 1 second...");
+      UtilWaitThread.sleep(1000);
+    }
+
+    for (int i = 0; i < 5; i++) {
+      if (configureReplicationTable(conn)) {
+        return;
+      }
+
+      log.error("Failed to configure the replication table, retying...");
+      UtilWaitThread.sleep(1000);
+    }
+
+    throw new RuntimeException("Could not configure replication table");
+  }
+
+  /**
+   * Attempts to configure the replication table, will return false if it fails
+   *
+   * @param conn
+   *          Connector for the instance
+   * @return True if the replication table is properly configured
+   */
+  protected static synchronized boolean configureReplicationTable(Connector 
conn) {
+    try {
+      conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.READ);
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.warn("Could not grant root user read access to replication table", 
e);
+      // Should this be fatal? It's only for convenience, all r/w is done by 
!SYSTEM
+    }
+
+    TableOperations tops = conn.tableOperations();
+    Map<String,EnumSet<IteratorScope>> iterators = null;
+    try {
+      iterators = tops.listIterators(ReplicationTable.NAME);
+    } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
+      log.error("Could not fetch iterators for " + ReplicationTable.NAME, e);
+      return false;
+    }
+
+    if (!iterators.containsKey(ReplicationTable.COMBINER_NAME)) {
+      // Set our combiner and combine all columns
+      IteratorSetting setting = new IteratorSetting(30, 
ReplicationTable.COMBINER_NAME, StatusCombiner.class);
+      Combiner.setColumns(setting, Arrays.asList(new 
Column(StatusSection.NAME), new Column(WorkSection.NAME)));
+      try {
+        tops.attachIterator(ReplicationTable.NAME, setting);
+      } catch (AccumuloSecurityException | AccumuloException | 
TableNotFoundException e) {
+        log.error("Could not set StatusCombiner on replication table", e);
+        return false;
+      }
+    }
+
+    Map<String,Set<Text>> localityGroups;
+    try {
+      localityGroups = tops.getLocalityGroups(ReplicationTable.NAME);
+    } catch (TableNotFoundException | AccumuloException e) {
+      log.error("Could not fetch locality groups", e);
+      return false;
+    }
+
+    Set<Text> statusColfams = 
localityGroups.get(ReplicationTable.STATUS_LG_NAME), workColfams = 
localityGroups.get(ReplicationTable.WORK_LG_NAME);
+    if (null == statusColfams || null == workColfams) {
+      try {
+        tops.setLocalityGroups(ReplicationTable.NAME, 
ReplicationTable.LOCALITY_GROUPS);
+      } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+        log.error("Could not set locality groups on replication table", e);
+        return false;
+      }
+    }
+
+    // Make sure the StatusFormatter is set on the metadata table
+    Iterable<Entry<String,String>> properties;
+    try {
+      properties = tops.getProperties(ReplicationTable.NAME);
+    } catch (AccumuloException | TableNotFoundException e) {
+      log.error("Could not fetch table properties on replication table", e);
+      return false;
+    }
+
+    boolean formatterConfigured = false;
+    for (Entry<String,String> property : properties) {
+      if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
+        if 
(!ReplicationTable.STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
+          log.info("Changing formatter for {} table from {} to {}", 
ReplicationTable.NAME, property.getValue(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
+          try {
+            tops.setProperty(ReplicationTable.NAME, 
Property.TABLE_FORMATTER_CLASS.getKey(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
+          } catch (AccumuloException | AccumuloSecurityException e) {
+            log.error("Could not set formatter on replication table", e);
+            return false;
+          }
+        }
+
+        formatterConfigured = true;
+
+        // Don't need to keep iterating over the properties after we found the 
one we were looking for
+        break;
+      }
+    }
+
+    if (!formatterConfigured) {
+      try {
+        tops.setProperty(ReplicationTable.NAME, 
Property.TABLE_FORMATTER_CLASS.getKey(), 
ReplicationTable.STATUS_FORMATTER_CLASS_NAME);
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        log.error("Could not set formatter on replication table", e);
+        return false;
+      }
+    }
+
+    log.debug("Successfully configured replication table");
+
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
----------------------------------------------------------------------
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
index bb80c81..44ac3fe 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.iterators.Combiner;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Before;
@@ -62,7 +63,7 @@ public class ReplicationTableTest {
   public void replicationTableCreated() {
     TableOperations tops = conn.tableOperations();
     Assert.assertFalse(tops.exists(ReplicationTable.NAME));
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     Assert.assertTrue(tops.exists(ReplicationTable.NAME));
   }
 
@@ -72,14 +73,14 @@ public class ReplicationTableTest {
     Assert.assertFalse(tops.exists(ReplicationTable.NAME));
 
     // Create the table
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     // Make sure it exists and save off the id
     Assert.assertTrue(tops.exists(ReplicationTable.NAME));
     String tableId = tops.tableIdMap().get(ReplicationTable.NAME);
 
     // Try to make it again, should return quickly
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     Assert.assertTrue(tops.exists(ReplicationTable.NAME));
 
     // Verify we have the same table as previously
@@ -96,7 +97,7 @@ public class ReplicationTableTest {
 
     Assert.assertFalse(iterators.containsKey(ReplicationTable.COMBINER_NAME));
 
-    ReplicationTable.configureReplicationTable(conn);
+    ReplicationUtil.configureReplicationTable(conn);
 
     // After configure the iterator should be set
     iterators = tops.listIterators(ReplicationTable.NAME);
@@ -123,7 +124,7 @@ public class ReplicationTableTest {
       tops.delete(ReplicationTable.NAME);
     }
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     Set<String> iters = tops.listIterators(ReplicationTable.NAME).keySet();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 9646be9..9928d3c 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -58,7 +59,6 @@ import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java 
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index f943ac1..bbb2010 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -62,6 +62,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
@@ -87,7 +88,6 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.util.Halt;
@@ -756,7 +756,7 @@ public class SimpleGarbageCollector implements Iface {
     HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
     log.debug("Starting garbage collector listening on " + result);
     try {
-      return TServerUtils.startTServer(result, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2, 
+      return TServerUtils.startTServer(result, processor, 
this.getClass().getSimpleName(), "GC Monitor Service", 2,
           config.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, 
maxMessageSize, SslConnectionParams.forServer(config), 0).address;
     } catch (Exception ex) {
       log.fatal(ex, ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 74da72d..d13edf7 100644
--- 
a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ 
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -38,12 +38,12 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 74b0919..cd69efb 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -50,11 +50,12 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -116,12 +117,14 @@ public class GarbageCollectWriteAheadLogsTest {
 
   @Test
   public void testMapServersToFiles() {
+    // @formatter:off
     /*
      * Test fileToServerMap:
      * /dir1/server1/uuid1 -> server1 (new-style)
      * /dir1/uuid2 -> "" (old-style)
      * /dir3/server3/uuid3 -> server3 (new-style)
      */
+    // @formatter:on
     Map<Path,String> fileToServerMap = new java.util.HashMap<Path,String>();
     Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1);
     fileToServerMap.put(path1, "server1"); // new-style
@@ -129,20 +132,24 @@ public class GarbageCollectWriteAheadLogsTest {
     fileToServerMap.put(path2, ""); // old-style
     Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3);
     fileToServerMap.put(path3, "server3"); // old-style
+    // @formatter:off
     /*
      * Test nameToFileMap:
      * uuid1 -> /dir1/server1/uuid1
      * uuid3 -> /dir3/server3/uuid3
      */
+    // @formatter:on
     Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
     nameToFileMap.put(UUID1, path1);
     nameToFileMap.put(UUID3, path3);
 
-    /**
+    // @formatter:off
+    /*
      * Expected map:
      * server1 -> [ /dir1/server1/uuid1 ]
      * server3 -> [ /dir3/server3/uuid3 ]
      */
+    // @formatter:on
     Map<String,ArrayList<Path>> result = 
GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap);
     assertEquals(2, result.size());
     ArrayList<Path> list1 = result.get("server1");
@@ -157,6 +164,7 @@ public class GarbageCollectWriteAheadLogsTest {
     boolean isDir = (size == 0);
     return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path);
   }
+
   private void mockListStatus(Path dir, FileStatus... fileStatuses) throws 
Exception {
     expect(volMgr.listStatus(dir)).andReturn(fileStatuses);
   }
@@ -164,6 +172,7 @@ public class GarbageCollectWriteAheadLogsTest {
   @Test
   public void testScanServers_NewStyle() throws Exception {
     String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+    // @formatter:off
     /*
      * Test directory layout:
      * /dir1/
@@ -176,6 +185,7 @@ public class GarbageCollectWriteAheadLogsTest {
      *   server3/
      *     uuid3
      */
+    // @formatter:on
     Path serverDir1Path = new Path(DIR_1_PATH, "server1");
     FileStatus serverDir1 = makeFileStatus(0, serverDir1Path);
     Path subDir2Path = new Path(DIR_1_PATH, "subdir2");
@@ -199,19 +209,23 @@ public class GarbageCollectWriteAheadLogsTest {
     Map<String,Path> nameToFileMap = new java.util.HashMap<String,Path>();
     int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap);
     assertEquals(3, count);
+    // @formatter:off
     /*
      * Expected fileToServerMap:
      * /dir1/server1/uuid1 -> server1
      * /dir3/server3/uuid3 -> server3
      */
+    // @formatter:on
     assertEquals(2, fileToServerMap.size());
     assertEquals("server1", fileToServerMap.get(path1));
     assertEquals("server3", fileToServerMap.get(path3));
+    // @formatter:off
     /*
      * Expected nameToFileMap:
      * uuid1 -> /dir1/server1/uuid1
      * uuid3 -> /dir3/server3/uuid3
      */
+    // @formatter:on
     assertEquals(2, nameToFileMap.size());
     assertEquals(path1, nameToFileMap.get(UUID1));
     assertEquals(path3, nameToFileMap.get(UUID3));
@@ -219,6 +233,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   @Test
   public void testScanServers_OldStyle() throws Exception {
+    // @formatter:off
     /*
      * Test directory layout:
      * /dir1/
@@ -226,6 +241,7 @@ public class GarbageCollectWriteAheadLogsTest {
      * /dir3/
      *   uuid3
      */
+    // @formatter:on
     String[] walDirs = new String[] {"/dir1", "/dir3"};
     Path serverFile1Path = new Path(DIR_1_PATH, UUID1);
     FileStatus serverFile1 = makeFileStatus(100, serverFile1Path);
@@ -242,19 +258,23 @@ public class GarbageCollectWriteAheadLogsTest {
      * Expect only a single server, the non-server entry for upgrade WALs
      */
     assertEquals(1, count);
+    // @formatter:off
     /*
      * Expected fileToServerMap:
      * /dir1/uuid1 -> ""
      * /dir3/uuid3 -> ""
      */
+    // @formatter:on
     assertEquals(2, fileToServerMap.size());
     assertEquals("", fileToServerMap.get(serverFile1Path));
     assertEquals("", fileToServerMap.get(serverFile3Path));
+    // @formatter:off
     /*
      * Expected nameToFileMap:
      * uuid1 -> /dir1/uuid1
      * uuid3 -> /dir3/uuid3
      */
+    // @formatter:on
     assertEquals(2, nameToFileMap.size());
     assertEquals(serverFile1Path, nameToFileMap.get(UUID1));
     assertEquals(serverFile3Path, nameToFileMap.get(UUID3));
@@ -263,6 +283,7 @@ public class GarbageCollectWriteAheadLogsTest {
   @Test
   public void testGetSortedWALogs() throws Exception {
     String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"};
+    // @formatter:off
     /*
      * Test directory layout:
      * /dir1/
@@ -272,6 +293,7 @@ public class GarbageCollectWriteAheadLogsTest {
      * /dir3/
      *   uuid3
      */
+    // @formatter:on
     expect(volMgr.exists(DIR_1_PATH)).andReturn(true);
     expect(volMgr.exists(DIR_2_PATH)).andReturn(false);
     expect(volMgr.exists(DIR_3_PATH)).andReturn(true);
@@ -285,11 +307,13 @@ public class GarbageCollectWriteAheadLogsTest {
     replay(volMgr);
 
     Map<String,Path> sortedWalogs = gcwal.getSortedWALogs(recoveryDirs);
-    /**
+    // @formatter:off
+    /*
      * Expected map:
      * uuid1 -> /dir1/uuid1
      * uuid3 -> /dir3/uuid3
      */
+    // @formatter:on
     assertEquals(2, sortedWalogs.size());
     assertEquals(path1, sortedWalogs.get(UUID1));
     assertEquals(path3, sortedWalogs.get(UUID3));
@@ -360,7 +384,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -404,7 +428,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     GarbageCollectWriteAheadLogs gcWALs = new 
GarbageCollectWriteAheadLogs(inst, volMgr, false);
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -464,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void fetchesReplicationEntriesFromMetadataAndReplicationTables() 
throws Exception {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     long walCreateTime = System.currentTimeMillis();
     String wal = 
"hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git 
a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
 
b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 13d2263..4d99495 100644
--- 
a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ 
b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -47,12 +47,13 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.io.Text;
 import org.easymock.IAnswer;
 import org.junit.Assert;
@@ -308,7 +309,7 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new 
BatchWriterConfig());
     Mutation m = new Mutation(ReplicationSection.getRowPrefix() + 
"file:/accumulo/wal/tserver+port/12345");
     m.put(ReplicationSection.COLF, new Text("1"), 
StatusUtil.fileCreatedValue(System.currentTimeMillis()));
@@ -330,7 +331,7 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new 
BatchWriterConfig());
     Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
     m.put(ReplicationSection.COLF, new Text("1"), 
StatusUtil.fileCreatedValue(System.currentTimeMillis()));
@@ -352,7 +353,7 @@ public class CloseWriteAheadLogReferencesTest {
     Instance inst = new MockInstance(testName.getMethodName());
     Connector conn = inst.getConnector("root", new PasswordToken(""));
 
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new 
BatchWriterConfig());
     Mutation m = new Mutation(file);
     StatusSection.add(m, new Text("1"), 
ProtobufUtil.toValue(StatusUtil.ingestedUntil(1000)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
index 06a653d..d4a9af2 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java
@@ -29,7 +29,7 @@ import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
-import org.apache.accumulo.core.replication.ReplicationConstants;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.metrics.AbstractMetricsImpl;
@@ -61,7 +61,7 @@ public class ReplicationMetrics extends AbstractMetricsImpl 
implements Replicati
 
   @Override
   public int getNumFilesPendingReplication() {
-    if (!tops.exists(ReplicationConstants.TABLE_NAME)) {
+    if (!tops.exists(ReplicationTable.NAME)) {
       return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index 3810372..8e1d036 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -32,13 +32,13 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import 
org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.replication.WorkAssigner;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -252,14 +252,18 @@ public abstract class DistributedWorkQueueWorkAssigner 
implements WorkAssigner {
 
   /**
    * Queue the given work for the target
-   * @param path File to replicate
-   * @param target Target for the work
+   *
+   * @param path
+   *          File to replicate
+   * @param target
+   *          Target for the work
    * @return True if the work was queued, false otherwise
    */
   protected abstract boolean queueWork(Path path, ReplicationTarget target);
 
   /**
-   * @param target Target for the work
+   * @param target
+   *          Target for the work
    * @return Queued work for the given target
    */
   protected abstract Set<String> getQueuedWork(ReplicationTarget target);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index b816eab..8bc18eb 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -37,10 +37,10 @@ import 
org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Update the status record in the replication table with work
- * that has been replicated to each configured peer.
+ * Update the status record in the replication table with work that has been 
replicated to each configured peer.
  */
 public class FinishedWorkUpdater implements Runnable {
   private static final Logger log = 
LoggerFactory.getLogger(FinishedWorkUpdater.class);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index c670d51..ef76916 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -39,10 +39,10 @@ import 
org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -182,7 +182,7 @@ public class RemoveCompleteReplicationRecords implements 
Runnable {
         Long timeClosed = tableToTimeCreated.get(tableId);
         if (null == timeClosed) {
           tableToTimeCreated.put(tableId, status.getCreatedTime());
-        } else if (timeClosed != status.getCreatedTime()){
+        } else if (timeClosed != status.getCreatedTime()) {
           log.warn("Found multiple values for timeClosed for {}: {} and {}", 
row, timeClosed, status.getCreatedTime());
         }
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 68dd005..c0f2a6c 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -33,9 +33,10 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.io.Text;
@@ -45,8 +46,8 @@ import org.slf4j.LoggerFactory;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
- * Reads replication records from the metadata table and creates status 
records in the replication table. Deletes the record
- * from the metadata table when it's closed.
+ * Reads replication records from the metadata table and creates status 
records in the replication table. Deletes the record from the metadata table 
when it's
+ * closed.
  */
 public class StatusMaker {
   private static final Logger log = LoggerFactory.getLogger(StatusMaker.class);
@@ -64,7 +65,7 @@ public class StatusMaker {
    * Not for public use -- visible only for testing
    * <p>
    * Used to read records from a table other than 'metadata'
-   * 
+   *
    * @param table
    *          The table to read from
    */
@@ -92,7 +93,7 @@ public class StatusMaker {
         // Get a writer to the replication table
         if (null == replicationWriter) {
           // Ensures table exists and is properly configured
-          ReplicationTable.create(conn);
+          ReplicationUtil.createReplicationTable(conn);
           try {
             setBatchWriter(ReplicationTable.getBatchWriter(conn));
           } catch (TableNotFoundException e) {
@@ -105,7 +106,6 @@ public class StatusMaker {
         MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
         MetadataSchema.ReplicationSection.getTableId(entry.getKey(), tableId);
 
-
         Status status;
         try {
           status = Status.parseFrom(entry.getValue().get());
@@ -180,13 +180,17 @@ public class StatusMaker {
   }
 
   /**
-   * Create a record to track when the file was closed to ensure that 
replication preference
-   * is given to files that have been closed the longest and allow the work 
assigner to try to
-   * replicate in order that data was ingested (avoid replay in different 
order)
-   * @param file File being replicated
-   * @param tableId Table ID the file was used by
-   * @param stat Status msg
-   * @param value Serialized version of the Status msg
+   * Create a record to track when the file was closed to ensure that 
replication preference is given to files that have been closed the longest and 
allow the
+   * work assigner to try to replicate in order that data was ingested (avoid 
replay in different order)
+   *
+   * @param file
+   *          File being replicated
+   * @param tableId
+   *          Table ID the file was used by
+   * @param stat
+   *          Status msg
+   * @param value
+   *          Serialized version of the Status msg
    */
   protected boolean addOrderRecord(Text file, Text tableId, Status stat, Value 
value) {
     try {
@@ -219,15 +223,14 @@ public class StatusMaker {
   }
 
   /**
-   * Because there is only one active Master, and thus one active StatusMaker, 
the only
-   * safe time that we can issue the delete for a Status which is closed is 
immediately
-   * after writing it to the replication table.
+   * Because there is only one active Master, and thus one active StatusMaker, 
the only safe time that we can issue the delete for a Status which is closed is
+   * immediately after writing it to the replication table.
    * <p>
-   * If we try to defer and delete these entries in another thread/process, we 
will have
-   * no assurance that the Status message was propagated to the replication 
table. It is
-   * easiest, in terms of concurrency, to do this all in one step.
-   * 
-   * @param k The Key to delete
+   * If we try to defer and delete these entries in another thread/process, we 
will have no assurance that the Status message was propagated to the replication
+   * table. It is easiest, in terms of concurrency, to do this all in one step.
+   *
+   * @param k
+   *          The Key to delete
    */
   protected void deleteStatusRecord(Key k) {
     log.debug("Deleting {} from metadata table as it's no longer needed", 
k.toStringNoTruncate());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index eabcc84..95f607f 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -33,12 +33,12 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.replication.ReplicationSchema;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.io.DataOutputBuffer;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
index 7633823..f8cc775 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
@@ -31,9 +31,10 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -43,7 +44,7 @@ import org.junit.rules.TestName;
 import com.google.common.collect.Iterables;
 
 /**
- * 
+ *
  */
 public class FinishedWorkUpdaterTest {
 
@@ -67,7 +68,7 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void recordsWithProgressUpdateBothTables() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
     Status stat = 
Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
@@ -97,15 +98,17 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void chooseMinimumBeginOffset() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    // @formatter:off
     Status stat1 = 
Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
         stat2 = 
Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
         stat3 = 
Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
     ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
         target2 = new ReplicationTarget("peer2", "table2", "1"),
         target3 = new ReplicationTarget("peer3", "table3", "1");
+    // @formatter:on
 
     // Create a single work record for a file to some peer
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
@@ -133,15 +136,17 @@ public class FinishedWorkUpdaterTest {
 
   @Test
   public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
 
     String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    // @formatter:off
     Status stat1 = 
Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
         stat2 = 
Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
         stat3 = 
Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
     ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
         target2 = new ReplicationTarget("peer2", "table2", "1"),
         target3 = new ReplicationTarget("peer3", "table3", "1");
+    // @formatter:on
 
     // Create a single work record for a file to some peer
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index ec19a94..2255e21 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -37,10 +37,11 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.hadoop.io.Text;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -72,7 +73,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void notYetReplicationRecordsIgnored() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
     for (int i = 0; i < numRecords; i++) {
@@ -102,7 +103,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void partiallyReplicatedRecordsIgnored() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
     Status.Builder builder = Status.newBuilder();
@@ -137,7 +138,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void 
replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws 
Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 
@@ -190,7 +191,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void replicatedClosedRowsAreRemoved() throws Exception {
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 
@@ -282,7 +283,7 @@ public class RemoveCompleteReplicationRecordsTest {
 
   @Test
   public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception 
{
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
     int numRecords = 3;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 3d96ea1..d5bcfbd 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -37,13 +37,14 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import 
org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
@@ -85,7 +86,7 @@ public class SequentialWorkAssignerTest {
     assigner.setConnector(conn);
 
     // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -158,7 +159,7 @@ public class SequentialWorkAssignerTest {
     assigner.setConnector(conn);
 
     // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -238,7 +239,7 @@ public class SequentialWorkAssignerTest {
     assigner.setConnector(conn);
 
     // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done
@@ -359,7 +360,7 @@ public class SequentialWorkAssignerTest {
     assigner.setConnector(conn);
 
     // Create and grant ourselves write to the replication table
-    ReplicationTable.create(conn);
+    ReplicationUtil.createReplicationTable(conn);
     conn.securityOperations().grantTablePermission("root", 
ReplicationTable.NAME, TablePermission.WRITE);
 
     // Create two mutations, both of which need replication work done

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f1a47b38/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
index 966e2bd..6a8fa4d 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
@@ -38,11 +38,11 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;

Reply via email to