This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new eff92f1 Cleanup TabletServerLogger code (#793) eff92f1 is described below commit eff92f178b01786c4debcc9f927c8cd9a84103a7 Author: Mike Miller <mmil...@apache.org> AuthorDate: Wed Dec 12 17:57:50 2018 -0500 Cleanup TabletServerLogger code (#793) * Remove unnecessary object manipulation across method calls * Remove unused methods * Replace code with lambdas * Removed Mutations class that is no longer needed * Removed extra loops for checking durability by creating maxDurability method * Make Tserver not log mutation when durability is none * Simplify sendables object that calls commit on mutations --- .../org/apache/accumulo/tserver/Mutations.java | 40 ------- .../org/apache/accumulo/tserver/TabletServer.java | 82 +++++++------- .../org/apache/accumulo/tserver/log/DfsLogger.java | 24 ++-- .../accumulo/tserver/log/TabletServerLogger.java | 122 +++++---------------- .../apache/accumulo/tserver/log/DfsLoggerTest.java | 25 +++-- 5 files changed, 102 insertions(+), 191 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java deleted file mode 100644 index 76061e6..0000000 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Mutations.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver; - -import java.util.List; - -import org.apache.accumulo.core.client.Durability; -import org.apache.accumulo.core.data.Mutation; - -public class Mutations { - private final Durability durability; - private final List<Mutation> mutations; - - Mutations(Durability durability, List<Mutation> mutations) { - this.durability = durability; - this.mutations = mutations; - } - - public Durability getDurability() { - return durability; - } - - public List<Mutation> getMutations() { - return mutations; - } -} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 51962bd..97cdc9b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -1013,7 +1013,8 @@ public class TabletServer implements Runnable { private void flush(UpdateSession us) { int mutationCount = 0; - Map<CommitSession,Mutations> sendables = new HashMap<>(); + Map<CommitSession,List<Mutation>> sendables = new HashMap<>(); + Map<CommitSession,TabletMutations> loggables = new HashMap<>(); Throwable error = null; long pt1 = System.currentTimeMillis(); @@ -1031,7 +1032,8 @@ public class TabletServer implements Runnable { for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) { Tablet tablet = entry.getKey(); - Durability tabletDurability = tablet.getDurability(); + Durability durability = DurabilityImpl.resolveDurabilty(us.durability, + tablet.getDurability()); List<Mutation> mutations = entry.getValue(); if (mutations.size() > 0) { try { @@ -1045,8 +1047,11 @@ public class TabletServer implements Runnable { } us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); } else { - sendables.put(commitSession, new Mutations( - DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations)); + if (durability != Durability.NONE) { + loggables.put(commitSession, new TabletMutations(commitSession.getLogId(), + commitSession.getWALogSeq(), mutations, durability)); + } + sendables.put(commitSession, mutations); mutationCount += mutations.size(); } @@ -1059,9 +1064,12 @@ public class TabletServer implements Runnable { // only log and commit mutations if there were some // that did not violate constraints... this is what // prepareMutationsForCommit() expects - sendables.put(e.getCommitSession(), - new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), - e.getNonViolators())); + CommitSession cs = e.getCommitSession(); + if (durability != Durability.NONE) { + loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), + e.getNonViolators(), durability)); + } + sendables.put(cs, e.getNonViolators()); } mutationCount += mutations.size(); @@ -1082,9 +1090,7 @@ public class TabletServer implements Runnable { updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size()); if (error != null) { - for (Entry<CommitSession,Mutations> e : sendables.entrySet()) { - e.getKey().abortCommit(e.getValue().getMutations()); - } + sendables.forEach(CommitSession::abortCommit); throw new RuntimeException(error); } try { @@ -1094,7 +1100,7 @@ public class TabletServer implements Runnable { try { long t1 = System.currentTimeMillis(); - logger.logManyTablets(sendables); + logger.logManyTablets(loggables); long t2 = System.currentTimeMillis(); us.walogTimes.addStat(t2 - t1); @@ -1115,12 +1121,8 @@ public class TabletServer implements Runnable { Span commit = Trace.start("commit"); try { long t1 = System.currentTimeMillis(); - for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) { - CommitSession commitSession = entry.getKey(); - List<Mutation> mutations = entry.getValue().getMutations(); - + sendables.forEach((commitSession, mutations) -> { commitSession.commit(mutations); - KeyExtent extent = commitSession.getExtent(); if (us.currentTablet != null && extent == us.currentTablet.getExtent()) { @@ -1131,7 +1133,7 @@ public class TabletServer implements Runnable { us.successfulCommits.increment(us.currentTablet, us.queuedMutations.get(us.currentTablet).size()); } - } + }); long t2 = System.currentTimeMillis(); us.flushTime += (t2 - pt1); @@ -1266,12 +1268,14 @@ public class TabletServer implements Runnable { throw new NotServingTabletException(tkeyExtent); } - while (true) { + Durability durability = DurabilityImpl + .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability); + // instead of always looping on true, skip completely when durability is NONE + while (durability != Durability.NONE) { try { final Span wal = Trace.start("wal"); try { - logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl - .resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability)); + logger.log(cs, cs.getWALogSeq(), mutation, durability); } finally { wal.stop(); } @@ -1356,7 +1360,8 @@ public class TabletServer implements Runnable { ArrayList<TCMResult> results, ConditionalSession sess) { Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet(); - Map<CommitSession,Mutations> sendables = new HashMap<>(); + Map<CommitSession,List<Mutation>> sendables = new HashMap<>(); + Map<CommitSession,TabletMutations> loggables = new HashMap<>(); boolean sessionCanceled = sess.interruptFlag.get(); @@ -1369,7 +1374,8 @@ public class TabletServer implements Runnable { for (ServerConditionalMutation scm : entry.getValue()) results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); } else { - final Durability tabletDurability = tablet.getDurability(); + final Durability durability = DurabilityImpl.resolveDurabilty(sess.durability, + tablet.getDurability()); try { @SuppressWarnings("unchecked") @@ -1386,18 +1392,21 @@ public class TabletServer implements Runnable { } else { for (ServerConditionalMutation scm : entry.getValue()) results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED)); - sendables.put(cs, - new Mutations( - DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability), - mutations)); + if (durability != Durability.NONE) { + loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), + mutations, durability)); + } + sendables.put(cs, mutations); } } } catch (TConstraintViolationException e) { + CommitSession cs = e.getCommitSession(); if (e.getNonViolators().size() > 0) { - sendables.put(e.getCommitSession(), - new Mutations( - DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability), - e.getNonViolators())); + if (durability != Durability.NONE) { + loggables.put(cs, new TabletMutations(cs.getLogId(), cs.getWALogSeq(), + e.getNonViolators(), durability)); + } + sendables.put(cs, e.getNonViolators()); for (Mutation m : e.getNonViolators()) results.add( new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED)); @@ -1418,10 +1427,10 @@ public class TabletServer implements Runnable { Span walSpan = Trace.start("wal"); try { - while (sendables.size() > 0) { + while (loggables.size() > 0) { try { long t1 = System.currentTimeMillis(); - logger.logManyTablets(sendables); + logger.logManyTablets(loggables); long t2 = System.currentTimeMillis(); updateWalogWriteTime(t2 - t1); break; @@ -1440,12 +1449,7 @@ public class TabletServer implements Runnable { Span commitSpan = Trace.start("commit"); try { long t1 = System.currentTimeMillis(); - for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) { - CommitSession commitSession = entry.getKey(); - List<Mutation> mutations = entry.getValue().getMutations(); - - commitSession.commit(mutations); - } + sendables.forEach(CommitSession::commit); long t2 = System.currentTimeMillis(); updateAvgCommitTime(t2 - t1, sendables.size()); } finally { @@ -3389,7 +3393,7 @@ public class TabletServer implements Runnable { "Unable to find recovery files for extent " + extent + " logEntry: " + entry); recoveryLogs.add(recovery); } - logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver); + logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver); } public int createLogId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index cdb787f..a0bebab 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -31,6 +31,7 @@ import java.lang.reflect.Method; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -597,7 +598,7 @@ public class DfsLogger implements Comparable<DfsLogger> { return new LoggerOperation(work); } - public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { + public LoggerOperation logManyTablets(Collection<TabletMutations> mutations) throws IOException { Durability durability = Durability.NONE; List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<>(); for (TabletMutations tabletMutations : mutations) { @@ -608,21 +609,20 @@ public class DfsLogger implements Comparable<DfsLogger> { LogFileValue value = new LogFileValue(); value.mutations = tabletMutations.getMutations(); data.add(new Pair<>(key, value)); - if (tabletMutations.getDurability().ordinal() > durability.ordinal()) { - durability = tabletMutations.getDurability(); - } + durability = maxDurability(tabletMutations.getDurability(), durability); } - return logFileData(data, chooseDurabilityForGroupCommit(mutations)); + return logFileData(data, durability); } - static Durability chooseDurabilityForGroupCommit(List<TabletMutations> mutations) { - Durability result = Durability.NONE; - for (TabletMutations tabletMutations : mutations) { - if (tabletMutations.getDurability().ordinal() > result.ordinal()) { - result = tabletMutations.getDurability(); - } + /** + * Return the Durability with the highest precedence + */ + static Durability maxDurability(Durability dur1, Durability dur2) { + if (dur1.ordinal() > dur2.ordinal()) { + return dur1; + } else { + return dur2; } - return result; } public LoggerOperation minorCompactionFinished(long seq, int tid, String fqfn, diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 4f70880..1a1bfa2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -16,15 +16,13 @@ */ package org.apache.accumulo.tserver.log; +import static java.util.Collections.singletonList; + import java.io.IOException; import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -44,13 +42,11 @@ import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.fate.util.Retry.RetryFactory; -import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.replication.StatusUtil; import org.apache.accumulo.server.replication.proto.Replication.Status; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.ReplicationTableUtil; -import org.apache.accumulo.tserver.Mutations; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; @@ -320,15 +316,6 @@ public class TabletServerLogger { })); } - public void resetLoggers() throws IOException { - logIdLock.writeLock().lock(); - try { - close(); - } finally { - logIdLock.writeLock().unlock(); - } - } - private synchronized void close() throws IOException { if (!logIdLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("close should be called with write lock held!"); @@ -356,22 +343,6 @@ public class TabletServerLogger { LoggerOperation write(DfsLogger logger) throws Exception; } - private void write(CommitSession commitSession, boolean mincFinish, Writer writer) - throws IOException { - write(commitSession, mincFinish, writer, writeRetryFactory.createRetry()); - } - - private void write(CommitSession commitSession, boolean mincFinish, Writer writer, - Retry writeRetry) throws IOException { - List<CommitSession> sessions = Collections.singletonList(commitSession); - write(sessions, mincFinish, writer, writeRetry); - } - - private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) - throws IOException { - write(sessions, mincFinish, writer, writeRetryFactory.createRetry()); - } - private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer, Retry writeRetry) throws IOException { // Work very hard not to lock this during calls to the outside world @@ -454,7 +425,6 @@ public class TabletServerLogger { @Override void withWriteLock() throws IOException { close(); - closeForReplication(sessions); } }); } @@ -471,70 +441,44 @@ public class TabletServerLogger { @Override void withWriteLock() throws IOException { close(); - closeForReplication(sessions); } }); } - protected void closeForReplication(Collection<CommitSession> sessions) { - // TODO We can close the WAL here for replication purposes - } - public void defineTablet(final CommitSession commitSession, final Retry writeRetry) throws IOException { // scribble this into the metadata tablet, too. - write(commitSession, false, new Writer() { - @Override - public LoggerOperation write(DfsLogger logger) throws Exception { - logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), - commitSession.getExtent()); - return DfsLogger.NO_WAIT_LOGGER_OP; - } + write(singletonList(commitSession), false, logger -> { + logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), + commitSession.getExtent()); + return DfsLogger.NO_WAIT_LOGGER_OP; }, writeRetry); } + /** + * Log a single mutation. This method expects mutations that have a durability other than NONE. + */ public void log(final CommitSession commitSession, final long tabletSeq, final Mutation m, final Durability durability) throws IOException { - if (durability == Durability.NONE) { - return; - } - if (durability == Durability.DEFAULT) { + if (durability == Durability.DEFAULT || durability == Durability.NONE) { throw new IllegalArgumentException("Unexpected durability " + durability); } - write(commitSession, false, new Writer() { - @Override - public LoggerOperation write(DfsLogger logger) throws Exception { - return logger.log(tabletSeq, commitSession.getLogId(), m, durability); - } - }); + write(singletonList(commitSession), false, + logger -> logger.log(tabletSeq, commitSession.getLogId(), m, durability), + writeRetryFactory.createRetry()); logSizeEstimate.addAndGet(m.numBytes()); } - public void logManyTablets(Map<CommitSession,Mutations> mutations) throws IOException { - - final Map<CommitSession,Mutations> loggables = new HashMap<>(mutations); - for (Entry<CommitSession,Mutations> entry : mutations.entrySet()) { - if (entry.getValue().getDurability() == Durability.NONE) { - loggables.remove(entry.getKey()); - } - } + /** + * Log mutations. This method expects mutations that have a durability other than NONE. + */ + public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws IOException { if (loggables.size() == 0) return; - write(loggables.keySet(), false, new Writer() { - @Override - public LoggerOperation write(DfsLogger logger) throws Exception { - List<TabletMutations> copy = new ArrayList<>(loggables.size()); - for (Entry<CommitSession,Mutations> entry : loggables.entrySet()) { - CommitSession cs = entry.getKey(); - Durability durability = entry.getValue().getDurability(); - copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), - entry.getValue().getMutations(), durability)); - } - return logger.logManyTablets(copy); - } - }); - for (Mutations entry : loggables.values()) { + write(loggables.keySet(), false, logger -> logger.logManyTablets(loggables.values()), + writeRetryFactory.createRetry()); + for (TabletMutations entry : loggables.values()) { if (entry.getMutations().size() < 1) { throw new IllegalArgumentException("logManyTablets: logging empty mutation list"); } @@ -550,13 +494,10 @@ public class TabletServerLogger { long t1 = System.currentTimeMillis(); - write(commitSession, true, new Writer() { - @Override - public LoggerOperation write(DfsLogger logger) throws Exception { - return logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), - fullyQualifiedFileName, durability); - } - }); + write( + singletonList(commitSession), true, logger -> logger.minorCompactionFinished(walogSeq, + commitSession.getLogId(), fullyQualifiedFileName, durability), + writeRetryFactory.createRetry()); long t2 = System.currentTimeMillis(); @@ -565,18 +506,15 @@ public class TabletServerLogger { public long minorCompactionStarted(final CommitSession commitSession, final long seq, final String fullyQualifiedFileName, final Durability durability) throws IOException { - write(commitSession, false, new Writer() { - @Override - public LoggerOperation write(DfsLogger logger) throws Exception { - return logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName, - durability); - } - }); + write( + singletonList(commitSession), false, logger -> logger.minorCompactionStarted(seq, + commitSession.getLogId(), fullyQualifiedFileName, durability), + writeRetryFactory.createRetry()); return seq; } - public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List<Path> logs, - Set<String> tabletFiles, MutationReceiver mr) throws IOException { + public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles, + MutationReceiver mr) throws IOException { try { SortedLogRecovery recovery = new SortedLogRecovery(fs); recovery.recover(extent, logs, tabletFiles, mr); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java index 0291747..80a6ff5 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/DfsLoggerTest.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.log; import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -31,28 +32,36 @@ public class DfsLoggerTest { @Test public void testDurabilityForGroupCommit() { List<TabletMutations> lst = new ArrayList<>(); - assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst)); TabletMutations m1 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE); lst.add(m1); - assertEquals(Durability.NONE, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.NONE, chooseDurabilityForGroupCommit(lst)); TabletMutations m2 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG); lst.add(m2); - assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst)); TabletMutations m3 = new TabletMutations(0, 1, Collections.emptyList(), Durability.NONE); lst.add(m3); - assertEquals(Durability.LOG, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.LOG, chooseDurabilityForGroupCommit(lst)); TabletMutations m4 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH); lst.add(m4); - assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst)); TabletMutations m5 = new TabletMutations(0, 1, Collections.emptyList(), Durability.LOG); lst.add(m5); - assertEquals(Durability.FLUSH, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.FLUSH, chooseDurabilityForGroupCommit(lst)); TabletMutations m6 = new TabletMutations(0, 1, Collections.emptyList(), Durability.SYNC); lst.add(m6); - assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst)); TabletMutations m7 = new TabletMutations(0, 1, Collections.emptyList(), Durability.FLUSH); lst.add(m7); - assertEquals(Durability.SYNC, DfsLogger.chooseDurabilityForGroupCommit(lst)); + assertEquals(Durability.SYNC, chooseDurabilityForGroupCommit(lst)); + } + + static Durability chooseDurabilityForGroupCommit(Collection<TabletMutations> mutations) { + Durability result = Durability.NONE; + for (TabletMutations tabletMutations : mutations) { + result = DfsLogger.maxDurability(tabletMutations.getDurability(), result); + } + return result; } }