Updated Branches: refs/heads/master 040f89121 -> ff226a789
ACCUMULO-1605 Added many finally clauses to close spans in the presence of exceptions. I suspect that the extreme depth of the traces is caused by traces that are started and never stopped. This most likely happens in the presence of errors, where exceptions jump around the the span.stop calls. In particular, when a file is missing for a compaction. This is known to occur in 1.4 when bulk imports fail. See ACCUMULO-1044. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ff226a78 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ff226a78 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ff226a78 Branch: refs/heads/master Commit: ff226a78995057518ac354e671d90f1bb4c30884 Parents: 040f891 Author: Eric Newton <[email protected]> Authored: Thu Aug 1 10:07:06 2013 -0400 Committer: Eric Newton <[email protected]> Committed: Mon Aug 5 08:19:58 2013 -0400 ---------------------------------------------------------------------- .../accumulo/server/tabletserver/Tablet.java | 105 +++++---- .../server/tabletserver/TabletServer.java | 230 ++++++++++--------- 2 files changed, 178 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff226a78/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java index 0272a2f..427fd33 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java @@ -621,36 +621,39 @@ public class Tablet { TreeSet<FileRef> inUse = new TreeSet<FileRef>(); Span waitForScans = Trace.start("waitForScans"); - synchronized (Tablet.this) { - if (blockNewScans) { - if (reservationsBlocked) - throw new IllegalStateException(); + try { + synchronized (Tablet.this) { + if (blockNewScans) { + if (reservationsBlocked) + throw new IllegalStateException(); + + reservationsBlocked = true; + } - reservationsBlocked = true; - } - - for (FileRef path : pathsToWaitFor) { - while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { - try { - Tablet.this.wait(100); - } catch (InterruptedException e) { - log.warn(e, e); + for (FileRef path : pathsToWaitFor) { + while (fileScanReferenceCounts.get(path) > 0 && System.currentTimeMillis() - startTime < maxWaitTime) { + try { + Tablet.this.wait(100); + } catch (InterruptedException e) { + log.warn(e, e); + } } } + + for (FileRef path : pathsToWaitFor) { + if (fileScanReferenceCounts.get(path) > 0) + inUse.add(path); + } + + if (blockNewScans) { + reservationsBlocked = false; + Tablet.this.notifyAll(); + } + } - - for (FileRef path : pathsToWaitFor) { - if (fileScanReferenceCounts.get(path) > 0) - inUse.add(path); - } - - if (blockNewScans) { - reservationsBlocked = false; - Tablet.this.notifyAll(); - } - + } finally { + waitForScans.stop(); } - waitForScans.stop(); return inUse; } @@ -2070,20 +2073,26 @@ public class Tablet { try { Span span = Trace.start("write"); - count = memTable.getNumEntries(); - - DataFileValue dfv = null; - if (mergeFile != null) - dfv = datafileManager.getDatafileSizes().get(mergeFile); - - MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); - CompactionStats stats = compactor.call(); - - span.stop(); + CompactionStats stats; + try { + count = memTable.getNumEntries(); + + DataFileValue dfv = null; + if (mergeFile != null) + dfv = datafileManager.getDatafileSizes().get(mergeFile); + + MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); + stats = compactor.call(); + } finally { + span.stop(); + } span = Trace.start("bringOnline"); - datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), - commitSession, flushId); - span.stop(); + try { + datafileManager.bringMinorCompactionOnline(tmpDatafile, newDatafile, mergeFile, new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()), + commitSession, flushId); + } finally { + span.stop(); + } return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); } catch (Exception E) { failed = true; @@ -3317,18 +3326,18 @@ public class Tablet { // Always trace majC Span span = Trace.on("majorCompaction"); - synchronized (this) { - // check that compaction is still needed - defer to splitting - majorCompactionQueued.remove(reason); - - if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || needsSplit()) { - return null; + try { + synchronized (this) { + // check that compaction is still needed - defer to splitting + majorCompactionQueued.remove(reason); + + if (closing || closed || !needsMajorCompaction(reason) || majorCompactionInProgress || needsSplit()) { + return null; + } + + majorCompactionInProgress = true; } - - majorCompactionInProgress = true; - } - try { majCStats = _majorCompact(reason); if (reason == MajorCompactionReason.CHOP) { MetadataTableUtil.chopped(getExtent(), this.tabletServer.getLock()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/ff226a78/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 6182b27..3216731 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -1528,56 +1528,58 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu TabletServer.this.resourceManager.waitUntilCommitsAreEnabled(); Span prep = Trace.start("prep"); - for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) { - - Tablet tablet = entry.getKey(); - List<Mutation> mutations = entry.getValue(); - if (mutations.size() > 0) { - try { - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size()); - - CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations); - if (commitSession == null) { - if (us.currentTablet == tablet) { - us.currentTablet = null; + try { + for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) { + + Tablet tablet = entry.getKey(); + List<Mutation> mutations = entry.getValue(); + if (mutations.size() > 0) { + try { + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.mutationArraySize, mutations.size()); + + CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations); + if (commitSession == null) { + if (us.currentTablet == tablet) { + us.currentTablet = null; + } + us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); + } else { + sendables.put(commitSession, mutations); + mutationCount += mutations.size(); } - us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet)); - } else { - sendables.put(commitSession, mutations); + + } catch (TConstraintViolationException e) { + us.violations.add(e.getViolations()); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0); + + if (e.getNonViolators().size() > 0) { + // only log and commit mutations if there were some + // that did not + // violate constraints... this is what + // prepareMutationsForCommit() + // expects + sendables.put(e.getCommitSession(), e.getNonViolators()); + } + mutationCount += mutations.size(); + + } catch (HoldTimeoutException t) { + error = t; + log.debug("Giving up on mutations due to a long memory hold time"); + break; + } catch (Throwable t) { + error = t; + log.error("Unexpected error preparing for commit", error); + break; } - - } catch (TConstraintViolationException e) { - us.violations.add(e.getViolations()); - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.constraintViolations, 0); - - if (e.getNonViolators().size() > 0) { - // only log and commit mutations if there were some - // that did not - // violate constraints... this is what - // prepareMutationsForCommit() - // expects - sendables.put(e.getCommitSession(), e.getNonViolators()); - } - - mutationCount += mutations.size(); - - } catch (HoldTimeoutException t) { - error = t; - log.debug("Giving up on mutations due to a long memory hold time"); - break; - } catch (Throwable t) { - error = t; - log.error("Unexpected error preparing for commit", error); - break; } } + } finally { + prep.stop(); } - prep.stop(); - Span wal = Trace.start("wal"); long pt2 = System.currentTimeMillis(); long avgPrepareTime = (long) ((pt2 - pt1) / (double) us.queuedMutations.size()); us.prepareTimes.addStat(pt2 - pt1); @@ -1591,60 +1593,66 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu throw new RuntimeException(error); } try { - while (true) { - try { - long t1 = System.currentTimeMillis(); - - logger.logManyTablets(sendables); - - long t2 = System.currentTimeMillis(); - us.walogTimes.addStat(t2 - t1); - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1)); - - break; - } catch (IOException ex) { - log.warn("logging mutations failed, retrying"); - } catch (FSError ex) { // happens when DFS is localFS - log.warn("logging mutations failed, retrying"); - } catch (Throwable t) { - log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); - throw new RuntimeException(t); + Span wal = Trace.start("wal"); + try { + while (true) { + try { + long t1 = System.currentTimeMillis(); + + logger.logManyTablets(sendables); + + long t2 = System.currentTimeMillis(); + us.walogTimes.addStat(t2 - t1); + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.waLogWriteTime, (t2 - t1)); + + break; + } catch (IOException ex) { + log.warn("logging mutations failed, retrying"); + } catch (FSError ex) { // happens when DFS is localFS + log.warn("logging mutations failed, retrying"); + } catch (Throwable t) { + log.error("Unknown exception logging mutations, counts for mutations in flight not decremented!", t); + throw new RuntimeException(t); + } } + } finally { + wal.stop(); } - wal.stop(); - Span commit = Trace.start("commit"); - long t1 = System.currentTimeMillis(); - for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) { - CommitSession commitSession = entry.getKey(); - List<Mutation> mutations = entry.getValue(); + try { + long t1 = System.currentTimeMillis(); + for (Entry<CommitSession,? extends List<Mutation>> entry : sendables.entrySet()) { + CommitSession commitSession = entry.getKey(); + List<Mutation> mutations = entry.getValue(); + + commitSession.commit(mutations); + + Tablet tablet = commitSession.getTablet(); + + if (tablet == us.currentTablet) { + // because constraint violations may filter out some + // mutations, for proper + // accounting with the client code, need to increment + // the count based + // on the original number of mutations from the client + // NOT the filtered number + us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size()); + } + } + long t2 = System.currentTimeMillis(); - commitSession.commit(mutations); + long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size()); - Tablet tablet = commitSession.getTablet(); + us.flushTime += (t2 - pt1); + us.commitTimes.addStat(t2 - t1); - if (tablet == us.currentTablet) { - // because constraint violations may filter out some - // mutations, for proper - // accounting with the client code, need to increment - // the count based - // on the original number of mutations from the client - // NOT the filtered number - us.successfulCommits.increment(tablet, us.queuedMutations.get(tablet).size()); - } + if (updateMetrics.isEnabled()) + updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime); + } finally { + commit.stop(); } - long t2 = System.currentTimeMillis(); - - long avgCommitTime = (long) ((t2 - t1) / (double) sendables.size()); - - us.flushTime += (t2 - pt1); - us.commitTimes.addStat(t2 - t1); - - if (updateMetrics.isEnabled()) - updateMetrics.add(TabletServerUpdateMetrics.commitTime, avgCommitTime); - commit.stop(); } finally { us.queuedMutations.clear(); if (us.currentTablet != null) { @@ -1716,8 +1724,12 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu List<Mutation> mutations = Collections.singletonList(mutation); Span prep = Trace.start("prep"); - CommitSession cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations); - prep.stop(); + CommitSession cs; + try { + cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials), mutations); + } finally { + prep.stop(); + } if (cs == null) { throw new NotServingTabletException(tkeyExtent); } @@ -1725,8 +1737,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu while (true) { try { Span wal = Trace.start("wal"); - logger.log(cs, cs.getWALogSeq(), mutation); - wal.stop(); + try { + logger.log(cs, cs.getWALogSeq(), mutation); + } finally { + wal.stop(); + } break; } catch (IOException ex) { log.warn(ex, ex); @@ -1734,8 +1749,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } Span commit = Trace.start("commit"); - cs.commit(mutations); - commit.stop(); + try { + cs.commit(mutations); + } finally { + commit.stop(); + } } catch (TConstraintViolationException e) { throw new ConstraintViolationException(Translator.translate(e.getViolations().asList(), Translator.CVST)); } finally { @@ -2913,11 +2931,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu MetadataTableUtil.addLogEntry(SystemCredentials.get(), entry, getLock()); } - private int startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { + private InetSocketAddress startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { ServerAddress sp = TServerUtils.startServer(conf, address, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); this.server = sp.server; - return sp.address.getPort(); + return sp.address; } private String getMasterAddress() { @@ -2953,13 +2971,13 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu ThriftUtil.returnClient(client); } - private int startTabletClientService() throws UnknownHostException { + private InetSocketAddress startTabletClientService() throws UnknownHostException { // start listening for client connection last Iface tch = TraceWrap.service(new ThriftClientHandler()); Processor<Iface> processor = new Processor<Iface>(tch); - int port = startServer(getSystemConfiguration(), clientAddress.getHostName(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); - log.info("port = " + port); - return port; + InetSocketAddress address = startServer(getSystemConfiguration(), clientAddress.getHostName(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); + log.info("address = " + address); + return address; } ZooLock getLock() { @@ -3026,17 +3044,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public void run() { SecurityUtil.serverLogin(); - int clientPort = 0; try { - clientPort = startTabletClientService(); + clientAddress = startTabletClientService(); } catch (UnknownHostException e1) { - log.error("Unable to start tablet client service", e1); - UtilWaitThread.sleep(1000); - } - if (clientPort == 0) { - throw new RuntimeException("Failed to start the tablet client service"); + throw new RuntimeException("Failed to start the tablet client service", e1); } - clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort); announceExistence(); ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
