ACCUMULO-3423 updates from reviews
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9c2ca7a5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9c2ca7a5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9c2ca7a5 Branch: refs/heads/master Commit: 9c2ca7a5cce8f7d0bacb100fed6405c86bde2a2d Parents: affff42 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Tue Apr 14 14:52:07 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Tue Apr 14 14:52:07 2015 -0400 ---------------------------------------------------------------------- .../core/tabletserver/log/LogEntry.java | 4 +++ .../server/master/state/MetaDataStateStore.java | 3 ++ .../accumulo/server/replication/StatusUtil.java | 12 +++++--- .../gc/GarbageCollectWriteAheadLogs.java | 30 ++++++++++++++------ .../apache/accumulo/tserver/TabletServer.java | 4 +-- .../apache/accumulo/tserver/log/DfsLogger.java | 1 + .../tserver/log/TabletServerLogger.java | 10 +++++-- 7 files changed, 46 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java index 964e3b3..90ce692 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java @@ -81,6 +81,10 @@ public class LogEntry { static private final Text EMPTY_TEXT = new Text(); public static LogEntry fromKeyValue(Key key, Value value) { + String qualifier = key.getColumnQualifier().toString(); + if (qualifier.indexOf('/') < 1) { + throw new IllegalArgumentException("Bad key for log entry: " + key); + } KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT); String[] parts = key.getColumnQualifier().toString().split("/", 2); String server = parts[0]; http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index adcf04d..c154bd0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -168,6 +168,9 @@ public class MetaDataStateStore extends TabletStateStore { BatchWriter writer = createBatchWriter(); try { for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) { + if (entry.getValue().isEmpty()) { + continue; + } Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString()); for (Path log : entry.getValue()) { m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java index e973ebc..d72eea2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java @@ -32,7 +32,6 @@ public class StatusUtil { private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE; private static final Status.Builder CREATED_STATUS_BUILDER; - private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER; static { CREATED_STATUS_BUILDER = Status.newBuilder(); @@ -46,7 +45,6 @@ public class StatusUtil { builder.setEnd(0); builder.setInfiniteEnd(true); builder.setClosed(false); - INF_END_REPLICATION_STATUS_BUILDER = builder; INF_END_REPLICATION_STATUS = builder.build(); INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS); @@ -155,8 +153,14 @@ public class StatusUtil { /** * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. */ - public static synchronized Status openWithUnknownLength(long timeCreated) { - return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build(); + public static Status openWithUnknownLength(long timeCreated) { + Builder builder = Status.newBuilder(); + builder.setBegin(0); + builder.setEnd(0); + builder.setInfiniteEnd(true); + builder.setClosed(false); + builder.setCreatedTime(timeCreated); + return builder.build(); } /** http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 59612ab..9f537af 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.UUID; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -130,9 +131,9 @@ public class GarbageCollectWriteAheadLogs { status.currentLog.candidates = count; span.stop(); - span = Trace.start("removeMetadataEntries"); + span = Trace.start("removeEntriesInUse"); try { - count = removeMetadataEntries(candidates, status, currentServers); + count = removeEntriesInUse(candidates, status, currentServers); } catch (Exception ex) { log.error("Unable to scan metadata table", ex); return; @@ -165,7 +166,7 @@ public class GarbageCollectWriteAheadLogs { span.stop(); span = Trace.start("removeMarkers"); - count = removeMarkers(candidates); + count = removeTabletServerMarkers(candidates); long removeMarkersStop = System.currentTimeMillis(); log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.)); span.stop(); @@ -182,7 +183,7 @@ public class GarbageCollectWriteAheadLogs { } } - private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) { + private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) { long result = 0; try { BatchWriter root = null; @@ -231,15 +232,19 @@ public class GarbageCollectWriteAheadLogs { return status.currentLog.deleted; } - private long removeMetadataEntries(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException, + private UUID path2uuid(Path path) { + return UUID.fromString(path.getName()); + } + + private long removeEntriesInUse(Map<TServerInstance, Set<Path> > candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException, KeeperException, InterruptedException { // remove any entries if there's a log reference, or a tablet is still assigned to the dead server - Map<Path, TServerInstance> walToDeadServer = new HashMap<>(); + Map<UUID, TServerInstance> walToDeadServer = new HashMap<>(); for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) { for (Path file : entry.getValue()) { - walToDeadServer.put(file, entry.getKey()); + walToDeadServer.put(path2uuid(file), entry.getKey()); } } long count = 0; @@ -254,9 +259,16 @@ public class GarbageCollectWriteAheadLogs { } for (Collection<String> wals : state.walogs) { for (String wal : wals) { - TServerInstance dead = walToDeadServer.get(new Path(wal)); + UUID walUUID = path2uuid(new Path(wal)); + TServerInstance dead = walToDeadServer.get(walUUID); if (dead != null) { - candidates.get(dead).remove(wal); + Iterator<Path> iter = candidates.get(dead).iterator(); + while (iter.hasNext()) { + if (path2uuid(iter.next()).equals(walUUID)) { + iter.remove(); + break; + } + } } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index e28d472..af45c14 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2893,7 +2893,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException { totalMinorCompactions.incrementAndGet(); logger.minorCompactionFinished(tablet, newDatafile, walogSeq); - removeUnusedWALs(); + markUnusedWALs(); } public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException { @@ -3000,7 +3000,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { // remove any meta entries after a rolled log is no longer referenced Set<DfsLogger> closedLogs = new HashSet<>(); - private void removeUnusedWALs() { + private void markUnusedWALs() { Set<DfsLogger> candidates; synchronized (closedLogs) { candidates = new HashSet<>(closedLogs); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index e256604..6e9cdf9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -383,6 +383,7 @@ public class DfsLogger implements Comparable<DfsLogger> { public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); + log.debug("Address is " + address); String logger = Joiner.on("+").join(address.split(":")); log.debug("DfsLogger.open() begin"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9c2ca7a5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 04e7a83..dd54798 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -79,7 +79,7 @@ public class TabletServerLogger { // The current logger private DfsLogger currentLog = null; private final SynchronousQueue<DfsLogger> nextLog = new SynchronousQueue<>(); - private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator"); + private ThreadPoolExecutor nextLogMaker; // The current generation of logs. // Because multiple threads can be using a log at one time, a log @@ -150,7 +150,6 @@ public class TabletServerLogger { this.maxSize = maxSize; this.syncCounter = syncCounter; this.flushCounter = flushCounter; - startLogMaker(); } private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { @@ -200,6 +199,7 @@ public class TabletServerLogger { } try { + startLogMaker(); DfsLogger next = nextLog.take(); log.info("Using next log " + next.getFileName()); currentLog = next; @@ -214,7 +214,11 @@ public class TabletServerLogger { } } - private void startLogMaker() { + private synchronized void startLogMaker() { + if (nextLogMaker != null) { + return; + } + nextLogMaker = new SimpleThreadPool(1, "WALog creator"); nextLogMaker.submit(new Runnable() { @Override public void run() {