ACCUMULO-3249 Some extra logging that's missing to get a clear picture.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/24921c72 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/24921c72 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/24921c72 Branch: refs/heads/master Commit: 24921c72745a2908202ca13c084d1ec46aba4950 Parents: 7aca032 Author: Josh Elser <[email protected]> Authored: Tue Oct 21 16:57:49 2014 -0400 Committer: Josh Elser <[email protected]> Committed: Thu Oct 23 11:18:33 2014 -0400 ---------------------------------------------------------------------- .../accumulo/master/replication/FinishedWorkUpdater.java | 4 +++- .../master/replication/RemoveCompleteReplicationRecords.java | 6 +++--- .../accumulo/master/replication/UnorderedWorkAssigner.java | 4 +++- 3 files changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java index 8048e02..b816eab 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java @@ -122,7 +122,7 @@ public class FinishedWorkUpdater implements Runnable { // Find the minimum value for begin (everyone has replicated up to this offset in the file) tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin())); - } + } if (error) { continue; @@ -151,6 +151,8 @@ public class FinishedWorkUpdater implements Runnable { // Make the mutation StatusSection.add(replMutation, buffer, serializedUpdatedStatus); + log.debug("Updating replication status entry for {} with {}", serializedRow.getKey().getRow(), updatedStatus); + try { replBw.addMutation(replMutation); } catch (MutationsRejectedException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java index ecf0c9e..c670d51 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java @@ -106,7 +106,7 @@ public class RemoveCompleteReplicationRecords implements Runnable { /** * Removes {@link Status} records read from the given {@code bs} and writes a delete, using the given {@code bw}, when that {@link Status} is fully replicated * and closed, as defined by {@link StatusUtil#isSafeForRemoval(Status)}. - * + * * @param conn * A Connector * @param bs @@ -164,6 +164,8 @@ public class RemoveCompleteReplicationRecords implements Runnable { k.getColumnFamily(colf); k.getColumnQualifier(colq); + log.debug("Removing {} {}:{} from replication table", row, colf, colq); + m.putDelete(colf, colq); String tableId; @@ -188,8 +190,6 @@ public class RemoveCompleteReplicationRecords implements Runnable { recordsRemoved++; } - log.info("Removing {} from the replication table", row); - List<Mutation> mutations = new ArrayList<>(); mutations.add(m); for (Entry<String,Long> entry : tableToTimeCreated.entrySet()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/24921c72/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java index 9042e2d..9a28dd4 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/UnorderedWorkAssigner.java @@ -70,6 +70,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { /** * Initialize the queuedWork set with the work already sent out */ + @Override protected void initializeQueuedWork() { if (null != queuedWork) { return; @@ -98,7 +99,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { /** * Distribute the work for the given path with filename - * + * * @param path * Path to the file being replicated * @param target @@ -108,6 +109,7 @@ public class UnorderedWorkAssigner extends DistributedWorkQueueWorkAssigner { protected boolean queueWork(Path path, ReplicationTarget target) { String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target); if (queuedWork.contains(queueKey)) { + log.debug("{} is already queued to be replicated to {}, not re-queueing", path, target); return false; }
