This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 56b21ca More cleanup for WAL (#842) 56b21ca is described below commit 56b21ca4395b54e525850437d70e7ec28568d2b3 Author: Mike Miller <mmil...@apache.org> AuthorDate: Fri Dec 28 12:31:16 2018 -0500 More cleanup for WAL (#842) * Only synchronize on write method * Actually log MUTATION for single mutation * Make TabletMutations constructor take CommitSession * Simplify calls to defineTablet and logFileData * Remove unused datafile path param * Fix slf4j debug statement --- .../apache/accumulo/tserver/TabletMutations.java | 14 ++--- .../org/apache/accumulo/tserver/TabletServer.java | 15 +++--- .../org/apache/accumulo/tserver/log/DfsLogger.java | 63 +++++++++++----------- .../accumulo/tserver/log/TabletServerLogger.java | 32 +++-------- .../apache/accumulo/tserver/log/DfsLoggerTest.java | 24 ++++++--- 5 files changed, 68 insertions(+), 80 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java index 6a65097..32d70fe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletMutations.java @@ -20,16 +20,16 @@ import java.util.List; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.tserver.tablet.CommitSession; public class TabletMutations { - private final int tid; - private final long seq; + private CommitSession commitSession; private final List<Mutation> mutations; private final Durability durability; - public TabletMutations(int tid, long seq, List<Mutation> mutations, Durability durability) { - this.tid = tid; - this.seq = seq; + public TabletMutations(CommitSession commitSession, List<Mutation> mutations, + Durability durability) { + this.commitSession = commitSession; this.mutations = mutations; this.durability = durability; } @@ -39,11 +39,11 @@ public class TabletMutations { } public int getTid() { - return tid; + return commitSession.getLogId(); } public long getSeq() { - return seq; + return commitSession.getWALogSeq(); } public Durability getDurability() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index ee1d455..08c5495 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1045,8 +1045,8 @@ public class TabletServer implements Runnable { us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); } else { if (durability != Durability.NONE) { - loggables.put(commitSession, new TabletMutations(commitSession.getLogId(), - commitSession.getWALogSeq(), mutations, durability)); + loggables.put(commitSession, + new TabletMutations(commitSession, mutations, durability)); } sendables.put(commitSession, mutations); mutationCount += mutations.size(); @@ -1063,8 +1063,7 @@ public class TabletServer implements Runnable { // prepareMutationsForCommit() expects CommitSession cs = e.getCommitSession(); if (durability != Durability.NONE) { - loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), - e.getNonViolators(), durability)); + loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability)); } sendables.put(cs, e.getNonViolators()); } @@ -1272,7 +1271,7 @@ public class TabletServer implements Runnable { try { final Span wal = Trace.start("wal"); try { - logger.log(cs, cs.getWALogSeq(), mutation, durability); + logger.log(cs, mutation, durability); } finally { wal.stop(); } @@ -1390,8 +1389,7 @@ public class TabletServer implements Runnable { for (ServerConditionalMutation scm : entry.getValue()) results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED)); if (durability != Durability.NONE) { - loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), - mutations, durability)); + loggables.put(cs, new TabletMutations(cs, mutations, durability)); } sendables.put(cs, mutations); } @@ -1400,8 +1398,7 @@ public class TabletServer implements Runnable { CommitSession cs = e.getCommitSession(); if (e.getNonViolators().size() > 0) { if (durability != Durability.NONE) { - loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), - e.getNonViolators(), durability)); + loggables.put(cs, new TabletMutations(cs, e.getNonViolators(), durability)); } sendables.put(cs, e.getNonViolators()); for (Mutation m : e.getNonViolators()) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 5f9c0a8..721b0d8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -17,10 +17,12 @@ package org.apache.accumulo.tserver.log; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.singletonList; import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; +import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION; import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; import java.io.DataInputStream; @@ -32,7 +34,6 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -49,7 +50,6 @@ import org.apache.accumulo.core.crypto.streams.NoFlushOutputStream; import org.apache.accumulo.core.cryptoImpl.CryptoEnvironmentImpl; import org.apache.accumulo.core.cryptoImpl.NoCryptoService; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; import org.apache.accumulo.core.spi.crypto.CryptoEnvironment.Scope; import org.apache.accumulo.core.spi.crypto.CryptoService; @@ -66,6 +66,7 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.logger.LogFileKey; import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.accumulo.tserver.tablet.CommitSession; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -450,7 +451,7 @@ public class DfsLogger implements Comparable<DfsLogger> { key.event = OPEN; key.tserverSession = filename; key.filename = filename; - op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC); + op = logKeyData(key, Durability.SYNC); } catch (Exception ex) { if (logFile != null) logFile.close(); @@ -537,22 +538,14 @@ public class DfsLogger implements Comparable<DfsLogger> { } } - public synchronized void defineTablet(long seq, int tid, KeyExtent tablet) throws IOException { + public LoggerOperation defineTablet(CommitSession cs) throws IOException { // write this log to the METADATA table final LogFileKey key = new LogFileKey(); key.event = DEFINE_TABLET; - key.seq = seq; - key.tabletId = tid; - key.tablet = tablet; - try { - write(key, EMPTY); - } catch (ClosedChannelException ex) { - throw new LogClosedException(); - } catch (IllegalArgumentException e) { - log.error("Signature of sync method changed. Accumulo is likely" - + " incompatible with this version of Hadoop."); - throw new RuntimeException(e); - } + key.seq = cs.getWALogSeq(); + key.tabletId = cs.getLogId(); + key.tablet = cs.getExtent(); + return logKeyData(key, Durability.LOG); } private synchronized void write(LogFileKey key, LogFileValue value) throws IOException { @@ -561,26 +554,22 @@ public class DfsLogger implements Comparable<DfsLogger> { encryptingLogFile.flush(); } - public LoggerOperation log(long seq, int tid, Mutation mutation, Durability durability) - throws IOException { - return logManyTablets(Collections.singletonList( - new TabletMutations(tid, seq, Collections.singletonList(mutation), durability))); + private LoggerOperation logKeyData(LogFileKey key, Durability d) throws IOException { + return logFileData(singletonList(new Pair<>(key, EMPTY)), d); } private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys, Durability durability) throws IOException { DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1), durability); - synchronized (DfsLogger.this) { - try { - for (Pair<LogFileKey,LogFileValue> pair : keys) { - write(pair.getFirst(), pair.getSecond()); - } - } catch (ClosedChannelException ex) { - throw new LogClosedException(); - } catch (Exception e) { - log.error("Failed to write log entries", e); - work.exception = e; + try { + for (Pair<LogFileKey,LogFileValue> pair : keys) { + write(pair.getFirst(), pair.getSecond()); } + } catch (ClosedChannelException ex) { + throw new LogClosedException(); + } catch (Exception e) { + log.error("Failed to write log entries", e); + work.exception = e; } if (durability == Durability.LOG) @@ -614,6 +603,16 @@ public class DfsLogger implements Comparable<DfsLogger> { return logFileData(data, durability); } + public LoggerOperation log(CommitSession cs, Mutation m, Durability d) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = MUTATION; + key.seq = cs.getWALogSeq(); + key.tabletId = cs.getLogId(); + LogFileValue value = new LogFileValue(); + value.mutations = singletonList(m); + return logFileData(singletonList(new Pair<>(key, value)), d); + } + /** * Return the Durability with the highest precedence */ @@ -631,7 +630,7 @@ public class DfsLogger implements Comparable<DfsLogger> { key.event = COMPACTION_FINISH; key.seq = seq; key.tabletId = tid; - return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability); + return logKeyData(key, durability); } public LoggerOperation minorCompactionStarted(long seq, int tid, String fqfn, @@ -641,7 +640,7 @@ public class DfsLogger implements Comparable<DfsLogger> { key.seq = seq; key.tabletId = tid; key.filename = fqfn; - return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability); + return logKeyData(key, durability); } public String getLogger() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 6aac02e..5907902 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -275,7 +275,7 @@ public class TabletServerLogger { alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter); alog.open(tserver.getClientAddressString()); String fileName = alog.getFileName(); - log.debug("Created next WAL " + fileName); + log.debug("Created next WAL {}", fileName); tserver.addNewLogMarker(alog); while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) { log.info("Our WAL was not used for 12 hours: {}", fileName); @@ -348,9 +348,8 @@ public class TabletServerLogger { while (!success) { try { // get a reference to the loggers that no other thread can touch - DfsLogger copy = null; AtomicInteger currentId = new AtomicInteger(-1); - copy = initializeLoggers(currentId); + DfsLogger copy = initializeLoggers(currentId); currentLogId = currentId.get(); // add the logger to the log set for the memory in the tablet, @@ -361,7 +360,8 @@ public class TabletServerLogger { if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { try { // Scribble out a tablet definition and then write to the metadata table - defineTablet(commitSession, writeRetry); + write(singletonList(commitSession), false, + logger -> logger.defineTablet(commitSession), writeRetry); } finally { commitSession.finishUpdatingLogsUsed(); } @@ -441,26 +441,15 @@ public class TabletServerLogger { }); } - public void defineTablet(final CommitSession commitSession, final Retry writeRetry) - throws IOException { - // scribble this into the metadata tablet, too. - write(singletonList(commitSession), false, logger -> { - logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), - commitSession.getExtent()); - return DfsLogger.NO_WAIT_LOGGER_OP; - }, writeRetry); - } - /** * Log a single mutation. This method expects mutations that have a durability other than NONE. */ - public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m, - final Durability durability) throws IOException { + public void log(final CommitSession commitSession, final Mutation m, final Durability durability) + throws IOException { if (durability == Durability.DEFAULT || durability == Durability.NONE) { throw new IllegalArgumentException("Unexpected durability " + durability); } - write(singletonList(commitSession), false, - logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability), + write(singletonList(commitSession), false, logger -> logger.log(commitSession, m, durability), writeRetryFactory.createRetry()); logSizeEstimate.addAndGet(m.numBytes()); } @@ -486,16 +475,9 @@ public class TabletServerLogger { public void minorCompactionFinished(final CommitSession commitSession, final long walogSeq, final Durability durability) throws IOException { - - long t1 = System.currentTimeMillis(); - write(singletonList(commitSession), true, logger -> logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), durability), writeRetryFactory.createRetry()); - - long t2 = System.currentTimeMillis(); - - log.debug(" wrote MinC finish: writeTime:{}ms durability:{}", (t2 - t1), durability); } public long minorCompactionStarted(final CommitSession commitSession, final long seq, diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java index 80a6ff5..1073e46 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java @@ -25,6 +25,8 @@ import java.util.List; import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.tserver.TabletMutations; +import org.apache.accumulo.tserver.tablet.CommitSession; +import org.easymock.EasyMock; import org.junit.Test; public class DfsLoggerTest { @@ -32,26 +34,34 @@ public class DfsLoggerTest { @Test public void testDurabilityForGroupCommit() { List<TabletMutations> lst = new ArrayList<>(); + CommitSession commitSession = EasyMock.createMock(CommitSession.class); assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst)); - TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE); + TabletMutations m1 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.NONE); lst.add(m1); assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst)); - TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG); + TabletMutations m2 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.LOG); lst.add(m2); assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst)); - TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE); + TabletMutations m3 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.NONE); lst.add(m3); assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst)); - TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH); + TabletMutations m4 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.FLUSH); lst.add(m4); assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst)); - TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG); + TabletMutations m5 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.LOG); lst.add(m5); assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst)); - TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC); + TabletMutations m6 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.SYNC); lst.add(m6); assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst)); - TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH); + TabletMutations m7 = new TabletMutations(commitSession, Collections.emptyList(), + Durability.FLUSH); lst.add(m7); assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst)); }