Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Fri Jan 18 22:06:44 2013 @@ -104,6 +104,7 @@ import org.apache.accumulo.core.util.she import org.apache.accumulo.core.util.shell.commands.InfoCommand; import org.apache.accumulo.core.util.shell.commands.InsertCommand; import org.apache.accumulo.core.util.shell.commands.InterpreterCommand; +import org.apache.accumulo.core.util.shell.commands.ListCompactionsCommand; import org.apache.accumulo.core.util.shell.commands.ListIterCommand; import org.apache.accumulo.core.util.shell.commands.ListScansCommand; import org.apache.accumulo.core.util.shell.commands.MaxRowCommand; @@ -293,7 +294,7 @@ public class Shell extends ShellOptions Command[] dataCommands = {new DeleteCommand(), new DeleteManyCommand(), new DeleteRowsCommand(), new EGrepCommand(), new FormatterCommand(), new InterpreterCommand(), new GrepCommand(), new ImportDirectoryCommand(), new InsertCommand(), new MaxRowCommand(), new ScanCommand()}; - Command[] debuggingCommands = {new ClasspathCommand(), new DebugCommand(), new ListScansCommand(), new TraceCommand()}; + Command[] debuggingCommands = {new ClasspathCommand(), new DebugCommand(), new ListScansCommand(), new ListCompactionsCommand(), new TraceCommand()}; Command[] execCommands = {new ExecfileCommand(), new HistoryCommand()}; Command[] exitCommands = {new ByeCommand(), new ExitCommand(), new QuitCommand()}; Command[] helpCommands = {new AboutCommand(), new HelpCommand(), new InfoCommand(), new QuestionCommand()};
Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java?rev=1435371&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java Fri Jan 18 22:06:44 2013 @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.shell.commands; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.ActiveCompaction; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.util.Duration; + +class ActiveCompactionIterator implements Iterator<String> { + + private InstanceOperations instanceOps; + private Iterator<String> tsIter; + private Iterator<String> compactionIter; + + private static String maxDecimal(double count) { + if (count < 9.995) + return String.format("%.2f", count); + if (count < 99.95) + return String.format("%.1f", count); + return String.format("%.0f", count); + } + + private static String shortenCount(long count) { + if (count < 1000) + return count + ""; + if (count < 1000000) + return maxDecimal(count / 1000.0) + "K"; + if (count < 1000000000) + return maxDecimal(count / 1000000.0) + "M"; + return maxDecimal(count / 1000000000.0) + "B"; + } + + private void readNext() { + final List<String> compactions = new ArrayList<String>(); + + while (tsIter.hasNext()) { + + final String tserver = tsIter.next(); + try { + List<ActiveCompaction> acl = instanceOps.getActiveCompactions(tserver); + + acl = new ArrayList<ActiveCompaction>(acl); + + Collections.sort(acl, new Comparator<ActiveCompaction>() { + @Override + public int compare(ActiveCompaction o1, ActiveCompaction o2) { + return (int) (o2.getAge() - o1.getAge()); + } + }); + + for (ActiveCompaction ac : acl) { + String output = ac.getOutputFile(); + int index = output.indexOf("tables"); + if (index > 0) { + output = output.substring(index + 6); + } + + ac.getIterators(); + + List<String> iterList = new ArrayList<String>(); + Map<String,Map<String,String>> iterOpts = new HashMap<String,Map<String,String>>(); + for (IteratorSetting is : ac.getIterators()) { + iterList.add(is.getName() + "=" + is.getPriority() + "," + is.getIteratorClass()); + iterOpts.put(is.getName(), is.getOptions()); + } + + compactions.add(String.format("%21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", tserver, + Duration.format(ac.getAge(), "", "-"), ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()), shortenCount(ac.getEntriesWritten()), + ac.getTable(), ac.getExtent(), ac.getInputFiles(), + output, iterList, iterOpts)); + } + } catch (Exception e) { + compactions.add(tserver + " ERROR " + e.getMessage()); + } + + if (compactions.size() > 0) { + break; + } + } + + compactionIter = compactions.iterator(); + } + + ActiveCompactionIterator(List<String> tservers, InstanceOperations instanceOps) { + this.instanceOps = instanceOps; + this.tsIter = tservers.iterator(); + + final String header = String.format(" %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s | %-35s | %-9s | %s", "TABLET SERVER", "AGE", "TYPE", + "REASON", + "READ", "WROTE", "TABLE", "TABLET", "INPUT", "OUTPUT", "ITERATORS", "ITERATOR OPTIONS"); + + compactionIter = Collections.singletonList(header).iterator(); + } + + @Override + public boolean hasNext() { + return compactionIter.hasNext(); + } + + @Override + public String next() { + final String next = compactionIter.next(); + + if (!compactionIter.hasNext()) + readNext(); + + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + +} Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java?rev=1435371&view=auto ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java (added) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java Fri Jan 18 22:06:44 2013 @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.util.shell.commands; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.util.shell.Shell; +import org.apache.accumulo.core.util.shell.Shell.Command; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; + +public class ListCompactionsCommand extends Command { + + private Option tserverOption, disablePaginationOpt; + + @Override + public String description() { + return "lists what compactions are currently running in accumulo. See the accumulo.core.client.admin.ActiveCompaciton javadoc for more information about columns."; + } + + @Override + public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { + + List<String> tservers; + + final InstanceOperations instanceOps = shellState.getConnector().instanceOperations(); + + final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt()); + + if (cl.hasOption(tserverOption.getOpt())) { + tservers = new ArrayList<String>(); + tservers.add(cl.getOptionValue(tserverOption.getOpt())); + } else { + tservers = instanceOps.getTabletServers(); + } + + shellState.printLines(new ActiveCompactionIterator(tservers, instanceOps), paginate); + + return 0; + } + + @Override + public int numArgs() { + return 0; + } + + @Override + public Options getOptions() { + final Options opts = new Options(); + + tserverOption = new Option("ts", "tabletServer", true, "tablet server to list compactions for"); + tserverOption.setArgName("tablet server"); + opts.addOption(tserverOption); + + disablePaginationOpt = new Option("np", "no-pagination", false, "disable pagination of output"); + opts.addOption(disablePaginationOpt); + + return opts; + } + +} Modified: accumulo/trunk/core/src/main/thrift/tabletserver.thrift URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/tabletserver.thrift?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/core/src/main/thrift/tabletserver.thrift (original) +++ accumulo/trunk/core/src/main/thrift/tabletserver.thrift Fri Jan 18 22:06:44 2013 @@ -87,6 +87,35 @@ struct ActiveScan { 13:list<binary> authorizations } +enum CompactionType { + MINOR, + MERGE, + MAJOR, + FULL +} + +enum CompactionReason { + USER, + SYSTEM, + CHOP, + IDLE, + CLOSE +} + +struct ActiveCompaction { + 1:data.TKeyExtent extent + 2:i64 age + 3:i32 inputFiles + 4:string outputFile + 5:CompactionType type + 6:CompactionReason reason + 7:string localityGroup + 8:i64 entriesRead + 9:i64 entriesWritten + 10:list<data.IterInfo> ssiList + 11:map<string, map<string, string>> ssio +} + struct TIteratorSetting { 1:i32 priority; 2:string name; @@ -157,6 +186,7 @@ service TabletClientService extends clie oneway void fastHalt(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string lock); list<ActiveScan> getActiveScans(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec) + list<ActiveCompaction> getActiveCompactions(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec) oneway void removeLogs(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:list<string> filenames) } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Fri Jan 18 22:06:44 2013 @@ -18,6 +18,8 @@ package org.apache.accumulo.server.table import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -32,6 +34,7 @@ import org.apache.accumulo.core.data.Byt import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.FileSKVWriter; @@ -43,6 +46,9 @@ import org.apache.accumulo.core.iterator import org.apache.accumulo.core.iterators.system.DeletingIterator; import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.iterators.system.TimeSettingIterator; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; +import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; +import org.apache.accumulo.core.tabletserver.thrift.CompactionType; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; @@ -51,6 +57,8 @@ import org.apache.accumulo.server.proble import org.apache.accumulo.server.problems.ProblemReportingIterator; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -82,8 +90,133 @@ public class Compactor implements Callab protected KeyExtent extent; private List<IteratorSetting> iterators; + // things to report + private String currentLocalityGroup = ""; + private long startTime; + private long currentEntriesRead = 0; + private long currentEntriesWritten = 0; + private long totalEntriesRead = 0; + private long totalEntriesWritten = 0; + private MajorCompactionReason reason; + protected MinorCompactionReason mincReason; + + private synchronized void updateStats(long read, long written) { + this.currentEntriesRead = read; + this.currentEntriesWritten = written; + } + + private synchronized void clearStats() { + totalEntriesRead = 0; + totalEntriesWritten = 0; + currentEntriesRead = 0; + currentEntriesWritten = 0; + currentLocalityGroup = ""; + } + + private synchronized void rollStats() { + this.totalEntriesRead = currentEntriesRead; + this.totalEntriesWritten = currentEntriesWritten; + currentEntriesRead = 0; + currentEntriesWritten = 0; + } + + private synchronized void setLocalityGroup(String name) { + this.currentLocalityGroup = name; + } + + protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new HashSet<Compactor>()); + + public static class CompactionInfo { + + private Compactor compactor; + private String localityGroup; + private long entriesRead; + private long entriesWritten; + + CompactionInfo(Compactor compactor) { + // get a consistent snapshot of changing stats + synchronized (compactor) { + this.localityGroup = compactor.currentLocalityGroup; + this.entriesRead = compactor.totalEntriesRead + compactor.currentEntriesRead; + this.entriesWritten = compactor.totalEntriesWritten + compactor.currentEntriesWritten; + } + + this.compactor = compactor; + } + + public ActiveCompaction toThrift() { + + CompactionType type; + + if (compactor.imm != null) + if (compactor.filesToCompact.size() > 0) + type = CompactionType.MERGE; + else + type = CompactionType.MINOR; + else if (!compactor.propogateDeletes) + type = CompactionType.FULL; + else + type = CompactionType.MAJOR; + + CompactionReason reason; + + if (compactor.imm != null) + switch(compactor.mincReason){ + case USER: + reason = CompactionReason.USER; + break; + case CLOSE: + reason = CompactionReason.CLOSE; + break; + case SYSTEM: + default: + reason = CompactionReason.SYSTEM; + break; + } + else + switch (compactor.reason) { + case USER: + reason = CompactionReason.USER; + break; + case CHOP: + reason = CompactionReason.CHOP; + break; + case IDLE: + reason = CompactionReason.IDLE; + break; + case NORMAL: + default: + reason = CompactionReason.SYSTEM; + break; + } + + List<IterInfo> iiList = new ArrayList<IterInfo>(); + Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>(); + + for (IteratorSetting iterSetting : compactor.iterators) { + iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName())); + iterOptions.put(iterSetting.getName(), iterSetting.getOptions()); + } + + return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, compactor.filesToCompact.size(), + compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions); + } + } + + public static List<CompactionInfo> getRunningCompactions() { + ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>(); + + synchronized (runningCompactions) { + for (Compactor compactor : runningCompactions) { + compactions.add(new CompactionInfo(compactor)); + } + } + + return compactions; + } + Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes, - TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators) { + TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) { this.extent = extent; this.conf = conf; this.fs = fs; @@ -94,11 +227,14 @@ public class Compactor implements Callab this.acuTableConf = acuTableConf; this.env = env; this.iterators = iterators; + this.reason = reason; + + startTime = System.currentTimeMillis(); } Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes, TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) { - this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>()); + this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null); } public FileSystem getFileSystem() { @@ -119,7 +255,11 @@ public class Compactor implements Callab FileSKVWriter mfw = null; CompactionStats majCStats = new CompactionStats(); + + boolean remove = runningCompactions.add(this); + clearStats(); + try { FileOperations fileFactory = FileOperations.getInstance(); mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf); @@ -137,11 +277,13 @@ public class Compactor implements Callab if (mfw.supportsLocalityGroups()) { for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) { + setLocalityGroup(entry.getKey()); compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats); allColumnFamilies.addAll(entry.getValue()); } } + setLocalityGroup(""); compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats); long t2 = System.currentTimeMillis(); @@ -171,6 +313,10 @@ public class Compactor implements Callab log.error(e, e); throw e; } finally { + + if (remove) + runningCompactions.remove(this); + try { if (mfw != null) { // compaction must not have finished successfully, so close its output file @@ -281,8 +427,15 @@ public class Compactor implements Callab mfw.append(itr.getTopKey(), itr.getTopValue()); itr.next(); entriesCompacted++; + + if (entriesCompacted % 1024 == 0) { + // Periodically update stats, do not want to do this too often since its syncronized + updateStats(citr.getCount(), entriesCompacted); + } } + rollStats(); + if (itr.hasTop() && !env.isCompactionEnabled()) { // cancel major compaction operation try { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Fri Jan 18 22:06:44 2013 @@ -33,6 +33,7 @@ import org.apache.accumulo.server.conf.T import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -52,7 +53,7 @@ public class MinorCompactor extends Comp } MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String mergeFile, DataFileValue dfv, String outputFile, TableConfiguration acuTableConf, - KeyExtent extent) { + KeyExtent extent, MinorCompactionReason mincReason) { super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() { @Override @@ -65,6 +66,8 @@ public class MinorCompactor extends Comp return IteratorScope.minc; } }); + + super.mincReason = mincReason; } private boolean isTableDeleting() { @@ -86,52 +89,57 @@ public class MinorCompactor extends Comp int maxSleepTime = 1000 * Constants.DEFAULT_MINOR_COMPACTION_MAX_SLEEP_TIME; boolean reportedProblem = false; - do { - try { - CompactionStats ret = super.call(); + runningCompactions.add(this); + try { + do { + try { + CompactionStats ret = super.call(); + + // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted, + // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes())); + + if (reportedProblem) { + ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile()); + } + + return ret; + } catch (IOException e) { + log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ..."); + ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); + reportedProblem = true; + } catch (RuntimeException e) { + // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the + // minor compaction would succeed + log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e); + ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); + reportedProblem = true; + } catch (CompactionCanceledException e) { + throw new IllegalStateException(e); + } - // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f secs | %,d bytes ",map.size(), entriesCompacted, - // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes())); + Random random = new Random(); - if (reportedProblem) { - ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile()); - } + int sleep = sleepTime + random.nextInt(sleepTime); + log.debug("MinC failed sleeping " + sleep + " ms before retrying"); + UtilWaitThread.sleep(sleep); + sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor)); - return ret; - } catch (IOException e) { - log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ..."); - ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); - reportedProblem = true; - } catch (RuntimeException e) { - // if this is coming from a user iterator, it is possible that the user could change the iterator config and that the - // minor compaction would succeed - log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + " retrying ...", e); - ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(), ProblemType.FILE_WRITE, getOutputFile(), e)); - reportedProblem = true; - } catch (CompactionCanceledException e) { - throw new IllegalStateException(e); - } - - Random random = new Random(); - - int sleep = sleepTime + random.nextInt(sleepTime); - log.debug("MinC failed sleeping " + sleep + " ms before retrying"); - UtilWaitThread.sleep(sleep); - sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor)); - - // clean up - try { - if (getFileSystem().exists(new Path(getOutputFile()))) { - getFileSystem().delete(new Path(getOutputFile()), true); + // clean up + try { + if (getFileSystem().exists(new Path(getOutputFile()))) { + getFileSystem().delete(new Path(getOutputFile()), true); + } + } catch (IOException e) { + log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage()); } - } catch (IOException e) { - log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage()); - } - - if (isTableDeleting()) - return new CompactionStats(0, 0); - - } while (true); + + if (isTableDeleting()) + return new CompactionStats(0, 0); + + } while (true); + } finally { + runningCompactions.remove(this); + } } } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Fri Jan 18 22:06:44 2013 @@ -149,12 +149,16 @@ public class Tablet { enum MajorCompactionReason { // do not change the order, the order of this enum determines the order // in which queued major compactions are executed - ALL, + USER, CHOP, NORMAL, IDLE } + enum MinorCompactionReason { + USER, SYSTEM, CLOSE + } + public class CommitSession { private int seq; @@ -2123,7 +2127,7 @@ public class Tablet { } private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable, String tmpDatafile, String newDatafile, String mergeFile, - boolean hasQueueTime, long queued, CommitSession commitSession, long flushId) { + boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { boolean failed = false; long start = System.currentTimeMillis(); timer.incrementStatusMinor(); @@ -2138,7 +2142,7 @@ public class Tablet { if (mergeFile != null) dfv = datafileManager.getDatafileSizes().get(mergeFile); - MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent); + MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile, acuTableConf, extent, mincReason); CompactionStats stats = compactor.call(); span.stop(); @@ -2182,13 +2186,15 @@ public class Tablet { private DataFileValue stats; private String mergeFile; private long flushId; + private MinorCompactionReason mincReason; - MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId) { + MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { queued = System.currentTimeMillis(); minorCompactionWaitingToStart = true; this.commitSession = commitSession; this.mergeFile = mergeFile; this.flushId = flushId; + this.mincReason = mincReason; } public void run() { @@ -2219,7 +2225,7 @@ public class Tablet { span.stop(); span = Trace.start("compact"); this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation + "_tmp", newMapfileLocation, mergeFile, true, queued, - commitSession, flushId); + commitSession, flushId, mincReason); span.stop(); if (needsSplit()) { @@ -2240,14 +2246,14 @@ public class Tablet { } } - private synchronized MinorCompactionTask prepareForMinC(long flushId) { + private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { CommitSession oldCommitSession = tabletMemory.prepareForMinC(); otherLogs = currentLogs; currentLogs = new HashSet<DfsLogger>(); String mergeFile = datafileManager.reserveMergingMinorCompactionFile(); - return new MinorCompactionTask(mergeFile, oldCommitSession, flushId); + return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason); } @@ -2283,7 +2289,7 @@ public class Tablet { // a race condition MetadataTable.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock()); } else if (initiateMinor) - initiateMinorCompaction(tableFlushID); + initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER); } finally { if (updateMetadata) { @@ -2296,7 +2302,7 @@ public class Tablet { } - boolean initiateMinorCompaction() { + boolean initiateMinorCompaction(MinorCompactionReason mincReason) { if (isClosed()) { // don't bother trying to get flush id if closed... could be closed after this check but that is ok... just trying to cut down on uneeded log messages.... return false; @@ -2310,10 +2316,10 @@ public class Tablet { log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage()); return false; } - return initiateMinorCompaction(flushId); + return initiateMinorCompaction(flushId, mincReason); } - boolean minorCompactNow() { + boolean minorCompactNow(MinorCompactionReason mincReason) { long flushId; try { flushId = getFlushID(); @@ -2321,22 +2327,22 @@ public class Tablet { log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " + e.getMessage()); return false; } - MinorCompactionTask mct = createMinorCompactionTask(flushId); + MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason); if (mct == null) return false; mct.run(); return true; } - boolean initiateMinorCompaction(long flushId) { - MinorCompactionTask mct = createMinorCompactionTask(flushId); + boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) { + MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason); if (mct == null) return false; tabletResources.executeMinorCompaction(mct); return true; } - private MinorCompactionTask createMinorCompactionTask(long flushId) { + private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason mincReason) { MinorCompactionTask mct; long t1, t2; @@ -2371,7 +2377,7 @@ public class Tablet { return null; } - mct = prepareForMinC(flushId); + mct = prepareForMinC(flushId, mincReason); t2 = System.currentTimeMillis(); } } finally { @@ -2647,7 +2653,7 @@ public class Tablet { tabletMemory.waitForMinC(); try { - mct = prepareForMinC(getFlushID()); + mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE); } catch (NoNodeException e) { throw new RuntimeException(e); } @@ -2700,7 +2706,7 @@ public class Tablet { if (saveState && tabletMemory.getMemTable().getNumEntries() > 0) { try { - prepareForMinC(getFlushID()).run(); + prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run(); } catch (NoNodeException e) { throw new RuntimeException(e); } @@ -2864,7 +2870,7 @@ public class Tablet { if (cmp != 0) return cmp; - if (reason == MajorCompactionReason.ALL || reason == MajorCompactionReason.CHOP) { + if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) { // for these types of compactions want to do the oldest first cmp = (int) (queued - o.queued); if (cmp != 0) @@ -2895,7 +2901,7 @@ public class Tablet { public boolean needsMajorCompaction(MajorCompactionReason reason) { if (majorCompactionInProgress) return false; - if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.ALL) + if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER) return true; return tabletResources.needsMajorCompaction(datafileManager.getDatafileSizes(), reason); } @@ -3246,8 +3252,8 @@ public class Tablet { + datafileManager.abs2rel(new Path(compactTmpName))); // always propagate deletes, unless last batch - Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, - acuTableConf, extent, cenv, compactionIterators); + Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, acuTableConf, extent, + cenv, compactionIterators, reason); CompactionStats mcs = compactor.call(); @@ -3816,7 +3822,7 @@ public class Tablet { updateMetadata = true; lastCompactID = compactionId; } else - initiateMajorCompaction(MajorCompactionReason.ALL); + initiateMajorCompaction(MajorCompactionReason.USER); } if (updateMetadata) { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Jan 18 22:06:44 2013 @@ -111,6 +111,7 @@ import org.apache.accumulo.core.security import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; @@ -160,10 +161,12 @@ import org.apache.accumulo.server.securi import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.server.security.ZKAuthenticator; +import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo; import org.apache.accumulo.server.tabletserver.Tablet.CommitSession; import org.apache.accumulo.server.tabletserver.Tablet.KVEntry; import org.apache.accumulo.server.tabletserver.Tablet.LookupResult; import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch; import org.apache.accumulo.server.tabletserver.Tablet.Scanner; import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo; @@ -2121,6 +2124,24 @@ public class TabletServer extends Abstra } } + @Override + public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException { + try { + if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM)) + throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED); + } catch (AccumuloSecurityException e) { + throw e.asThriftException(); + } + + List<CompactionInfo> compactions = Compactor.getRunningCompactions(); + List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size()); + + for (CompactionInfo compactionInfo : compactions) { + ret.add(compactionInfo.toThrift()); + } + + return ret; + } } private class SplitRunner implements Runnable { @@ -2192,7 +2213,7 @@ public class TabletServer extends Abstra if (tablet.getLogCount() >= maxLogEntriesPerTablet) { log.debug("Initiating minor compaction for " + tablet.getExtent() + " because it has " + tablet.getLogCount() + " write ahead logs"); - tablet.initiateMinorCompaction(); + tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM); } synchronized (tablet) { @@ -2518,7 +2539,7 @@ public class TabletServer extends Abstra * it to the logs (the file will be in !METADATA, preventing replay of compacted data)... but do not want a majc to wipe the file out from !METADATA * and then have another process failure... this could cause duplicate data to replay */ - if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow()) { + if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.SYSTEM)) { throw new RuntimeException("Minor compaction after recovery fails for " + extentToOpen); } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Fri Jan 18 22:06:44 2013 @@ -51,6 +51,7 @@ import org.apache.accumulo.core.util.Uti import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason; +import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.hadoop.fs.FileSystem; @@ -349,7 +350,7 @@ public class TabletServerResourceManager continue; } - if (!tabletReport.getTablet().initiateMinorCompaction()) { + if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM)) { if (tabletReport.getTablet().isClosed()) { tabletReports.remove(tabletReport.getExtent()); log.debug("Ignoring memory manager recommendation: not minor compacting closed tablet " + keyExtent); @@ -545,7 +546,7 @@ public class TabletServerResourceManager // when too many files are open, we may want tablets to compact down // to one map file Map<String,Long> findMapFilesToCompact(SortedMap<String,DataFileValue> tabletFiles, MajorCompactionReason reason) { - if (reason == MajorCompactionReason.ALL) { + if (reason == MajorCompactionReason.USER) { Map<String,Long> files = new HashMap<String,Long>(); for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) { files.put(entry.getKey(), entry.getValue().getSize()); @@ -634,7 +635,7 @@ public class TabletServerResourceManager // int threshold; - if (reason == MajorCompactionReason.ALL) + if (reason == MajorCompactionReason.USER) return true; if (reason == MajorCompactionReason.IDLE) { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1435371&r1=1435370&r2=1435371&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java Fri Jan 18 22:06:44 2013 @@ -49,6 +49,7 @@ import org.apache.accumulo.core.master.t import org.apache.accumulo.core.security.thrift.AuthInfo; import org.apache.accumulo.core.security.thrift.SecurityErrorCode; import org.apache.accumulo.core.security.thrift.ThriftSecurityException; +import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction; import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; @@ -202,6 +203,11 @@ public class NullTserver { @Override public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames) throws TException { } + + @Override + public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, AuthInfo credentials) throws ThriftSecurityException, TException { + return new ArrayList<ActiveCompaction>(); + } } static class Opts extends Help {
