This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 7b35311b8a27ce3bc3a2c12a8e348ddb7c7cfde6 Author: Mike Miller <[email protected]> AuthorDate: Wed Sep 6 16:44:03 2017 -0400 ACCUMULO-4662 Refactored TableID in replication --- .../master/replication/SequentialWorkAssigner.java | 41 +++++++++--------- .../replication/SequentialWorkAssignerTest.java | 10 ++--- .../test/replication/SequentialWorkAssignerIT.java | 48 +++++++++++----------- 3 files changed, 50 insertions(+), 49 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java index f273a3f..c07707c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java @@ -25,6 +25,7 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.replication.ReplicationConstants; import org.apache.accumulo.core.replication.ReplicationTarget; @@ -53,7 +54,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { * } */ // @formatter:on - private Map<String,Map<String,String>> queuedWorkByPeerName; + private Map<String,Map<Table.ID,String>> queuedWorkByPeerName; public SequentialWorkAssigner() {} @@ -66,11 +67,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { return NAME; } - protected Map<String,Map<String,String>> getQueuedWork() { + protected Map<String,Map<Table.ID,String>> getQueuedWork() { return queuedWorkByPeerName; } - protected void setQueuedWork(Map<String,Map<String,String>> queuedWork) { + protected void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) { this.queuedWorkByPeerName = queuedWork; } @@ -97,11 +98,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { Entry<String,ReplicationTarget> entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(work); String filename = entry.getKey(); String peerName = entry.getValue().getPeerName(); - String sourceTableId = entry.getValue().getSourceTableId().canonicalID(); + Table.ID sourceTableId = entry.getValue().getSourceTableId(); log.debug("In progress replication of {} from table with ID {} to peer {}", filename, sourceTableId, peerName); - Map<String,String> replicationForPeer = queuedWorkByPeerName.get(peerName); + Map<Table.ID,String> replicationForPeer = queuedWorkByPeerName.get(peerName); if (null == replicationForPeer) { replicationForPeer = new HashMap<>(); queuedWorkByPeerName.put(peerName, replicationForPeer); @@ -116,24 +117,24 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { */ @Override protected void cleanupFinishedWork() { - final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator(); + final Iterator<Entry<String,Map<Table.ID,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator(); final String instanceId = conn.getInstance().getInstanceID(); int elementsRemoved = 0; // Check the status of all the work we've queued up while (queuedWork.hasNext()) { // {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...} - Entry<String,Map<String,String>> workForPeer = queuedWork.next(); + Entry<String,Map<Table.ID,String>> workForPeer = queuedWork.next(); // TableID to workKey (filename and ReplicationTarget) - Map<String,String> queuedReplication = workForPeer.getValue(); + Map<Table.ID,String> queuedReplication = workForPeer.getValue(); - Iterator<Entry<String,String>> iter = queuedReplication.entrySet().iterator(); + Iterator<Entry<Table.ID,String>> iter = queuedReplication.entrySet().iterator(); // Loop over every target we need to replicate this file to, removing the target when // the replication task has finished while (iter.hasNext()) { // tableID -> workKey - Entry<String,String> entry = iter.next(); + Entry<Table.ID,String> entry = iter.next(); // Null equates to the work for this target was finished if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + entry.getValue())) { log.debug("Removing {} from work assignment state", entry.getValue()); @@ -153,12 +154,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected boolean shouldQueueWork(ReplicationTarget target) { - Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); + Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); if (null == queuedWorkForPeer) { return true; } - String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID()); + String queuedWork = queuedWorkForPeer.get(target.getSourceTableId()); // If we have no work for the local table to the given peer, submit some! return null == queuedWork; @@ -167,17 +168,17 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected boolean queueWork(Path path, ReplicationTarget target) { String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target); - Map<String,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); + Map<Table.ID,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); if (null == workForPeer) { workForPeer = new HashMap<>(); this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer); } - String queuedWork = workForPeer.get(target.getSourceTableId().canonicalID()); + String queuedWork = workForPeer.get(target.getSourceTableId()); if (null == queuedWork) { try { workQueue.addWork(queueKey, path.toString()); - workForPeer.put(target.getSourceTableId().canonicalID(), queueKey); + workForPeer.put(target.getSourceTableId(), queueKey); } catch (KeeperException | InterruptedException e) { log.warn("Could not queue work for {} to {}", path, target, e); return false; @@ -195,12 +196,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected Set<String> getQueuedWork(ReplicationTarget target) { - Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); + Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); if (null == queuedWorkForPeer) { return Collections.emptySet(); } - String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID()); + String queuedWork = queuedWorkForPeer.get(target.getSourceTableId()); if (null == queuedWork) { return Collections.emptySet(); } else { @@ -210,15 +211,15 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner { @Override protected void removeQueuedWork(ReplicationTarget target, String queueKey) { - Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); + Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName()); if (null == queuedWorkForPeer) { log.warn("removeQueuedWork called when no work was queued for {}", target.getPeerName()); return; } - String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID()); + String queuedWork = queuedWorkForPeer.get(target.getSourceTableId()); if (queuedWork.equals(queueKey)) { - queuedWorkForPeer.remove(target.getSourceTableId().canonicalID()); + queuedWorkForPeer.remove(target.getSourceTableId()); } else { log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", target, queueKey, queuedWork); return; diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java index 26a090a..38c63f6 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java @@ -56,12 +56,12 @@ public class SequentialWorkAssignerTest { ZooCache zooCache = createMock(ZooCache.class); Instance inst = createMock(Instance.class); - Map<String,Map<String,String>> queuedWork = new TreeMap<>(); - Map<String,String> cluster1Work = new TreeMap<>(); + Map<String,Map<Table.ID,String>> queuedWork = new TreeMap<>(); + Map<Table.ID,String> cluster1Work = new TreeMap<>(); // Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for - cluster1Work.put("1", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1")))); - cluster1Work.put("2", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2")))); + cluster1Work.put(Table.ID.of("1"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1")))); + cluster1Work.put(Table.ID.of("2"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2")))); queuedWork.put("cluster1", cluster1Work); @@ -90,6 +90,6 @@ public class SequentialWorkAssignerTest { Assert.assertEquals(1, cluster1Work.size()); Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))), - cluster1Work.get("2")); + cluster1Work.get(Table.ID.of("2"))); } } diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java index 2a7b853..7b2055f 100644 --- a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java @@ -62,7 +62,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { } @Override - public void setQueuedWork(Map<String,Map<String,String>> queuedWork) { + public void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) { super.setQueuedWork(queuedWork); } @@ -137,7 +137,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { bw.close(); DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); - Map<String,Map<String,String>> queuedWork = new HashMap<>(); + Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>(); assigner.setQueuedWork(queuedWork); assigner.setWorkQueue(workQueue); assigner.setMaxQueueSize(Integer.MAX_VALUE); @@ -156,10 +156,10 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { Assert.assertEquals(1, queuedWork.size()); Assert.assertTrue(queuedWork.containsKey("cluster1")); - Map<String,String> cluster1Work = queuedWork.get("cluster1"); + Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); - Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId())); } @Test @@ -199,7 +199,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { bw.close(); DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); - Map<String,Map<String,String>> queuedWork = new HashMap<>(); + Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>(); assigner.setQueuedWork(queuedWork); assigner.setWorkQueue(workQueue); assigner.setMaxQueueSize(Integer.MAX_VALUE); @@ -222,13 +222,13 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { Assert.assertEquals(1, queuedWork.size()); Assert.assertTrue(queuedWork.containsKey("cluster1")); - Map<String,String> cluster1Work = queuedWork.get("cluster1"); + Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(2, cluster1Work.size()); - Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); - Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId())); } @Test @@ -268,7 +268,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { bw.close(); DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); - Map<String,Map<String,String>> queuedWork = new HashMap<>(); + Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>(); assigner.setQueuedWork(queuedWork); assigner.setWorkQueue(workQueue); assigner.setMaxQueueSize(Integer.MAX_VALUE); @@ -291,15 +291,15 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { Assert.assertEquals(2, queuedWork.size()); Assert.assertTrue(queuedWork.containsKey("cluster1")); - Map<String,String> cluster1Work = queuedWork.get("cluster1"); + Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); - Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId())); - Map<String,String> cluster2Work = queuedWork.get("cluster2"); + Map<Table.ID,String> cluster2Work = queuedWork.get("cluster2"); Assert.assertEquals(1, cluster2Work.size()); - Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId())); } @Test @@ -338,9 +338,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class); // Treat filename1 as we have already submitted it for replication - Map<String,Map<String,String>> queuedWork = new HashMap<>(); - Map<String,String> queuedWorkForCluster = new HashMap<>(); - queuedWorkForCluster.put(target.getSourceTableId().canonicalID(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target)); + Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>(); + Map<Table.ID,String> queuedWorkForCluster = new HashMap<>(); + queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target)); queuedWork.put("cluster1", queuedWorkForCluster); assigner.setQueuedWork(queuedWork); @@ -361,9 +361,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase { Assert.assertEquals(1, queuedWork.size()); Assert.assertTrue(queuedWork.containsKey("cluster1")); - Map<String,String> cluster1Work = queuedWork.get("cluster1"); + Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1"); Assert.assertEquals(1, cluster1Work.size()); - Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID())); - Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId().canonicalID())); + Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId())); + Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId())); } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
