http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 6f2c9a2..df0848d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; +import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; @@ -29,7 +30,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,7 +45,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; @@ -148,8 +147,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerOpts; -import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; @@ -1501,7 +1500,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { final AssignmentHandler ah = new AssignmentHandler(extent); // final Runnable ah = new LoggingRunnable(log, ); // Root tablet assignment must take place immediately - if (extent.isRootTablet()) { new Daemon("Root Tablet Assignment") { @Override @@ -1694,6 +1692,66 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { + String myname = getClientAddressString(); + myname = myname.replace(':', '+'); + Set<String> loggers = new HashSet<String>(); + logger.getLogFiles(loggers); + Set<String> loggerUUIDs = new HashSet<String>(); + for (String logger : loggers) + loggerUUIDs.add(new Path(logger).getName()); + + nextFile: for (String filename : filenames) { + String uuid = new Path(filename).getName(); + // skip any log we're currently using + if (loggerUUIDs.contains(uuid)) + continue nextFile; + + List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>(); + synchronized (onlineTablets) { + onlineTabletsCopy.addAll(onlineTablets.values()); + } + for (Tablet tablet : onlineTabletsCopy) { + for (String current : tablet.getCurrentLogFiles()) { + if (current.contains(uuid)) { + log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent()); + continue nextFile; + } + } + } + + try { + Path source = new Path(filename); + if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) { + Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives()); + fs.mkdirs(walogArchive); + Path dest = new Path(walogArchive, source.getName()); + log.info("Archiving walog " + source + " to " + dest); + if (!fs.rename(source, dest)) + log.error("rename is unsuccessful"); + } else { + log.info("Deleting walog " + filename); + Path sourcePath = new Path(filename); + if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath)) + && !fs.deleteRecursively(sourcePath)) + log.warn("Failed to delete walog " + source); + for (String recovery : ServerConstants.getRecoveryDirs()) { + Path recoveryPath = new Path(recovery, source.getName()); + try { + if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath)) + log.info("Deleted any recovery log " + filename); + } catch (FileNotFoundException ex) { + // ignore + } + } + } + } catch (IOException e) { + log.warn("Error attempting to delete write-ahead log " + filename + ": " + e); + } + } + } + + @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { try { checkPermission(credentials, null, "getActiveCompactions"); @@ -1714,23 +1772,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { - String log = logger.getLogFile(); - // Might be null if there no active logger - if (null == log) { - return Collections.emptyList(); - } - return Collections.singletonList(log); - } - - @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { - log.warn("Garbage collector is attempting to remove logs through the tablet server"); - log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + "Restart your file Garbage Collector."); + Set<String> logs = new HashSet<String>(); + logger.getLogFiles(logs); + return new ArrayList<String>(logs); } } private class SplitRunner implements Runnable { - private final Tablet tablet; + private Tablet tablet; public SplitRunner(Tablet tablet) { this.tablet = tablet; @@ -1984,7 +2033,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { log.error("Unexpected error ", e); } log.debug("Unassigning " + tls); - TabletStateStore.unassign(TabletServer.this, tls, null); + TabletStateStore.unassign(TabletServer.this, tls); } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); } catch (KeeperException e) { @@ -2188,6 +2237,29 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } + public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) { + if (!this.onlineTablets.containsKey(extent)) { + log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline"); + // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs, + // if it doesn't, we'll need to do the same recovery with the old files. + return; + } + + log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id); + long now = RelativeTime.currentTimeMillis(); + List<String> logSet = new ArrayList<String>(); + for (DfsLogger log : logs) + logSet.add(log.getFileName()); + LogEntry entry = new LogEntry(); + entry.extent = extent; + entry.tabletId = id; + entry.timestamp = now; + entry.server = logs.get(0).getLogger(); + entry.filename = logs.get(0).getFileName(); + entry.logSet = logSet; + MetadataTableUtil.addLogEntry(this, entry, getLock()); + } + private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); @@ -2906,7 +2978,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException { totalMinorCompactions.incrementAndGet(); logger.minorCompactionFinished(tablet, newDatafile, walogSeq); - markUnusedWALs(); } public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException { @@ -2925,11 +2996,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }); for (LogEntry entry : sorted) { Path recovery = null; - Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename)); - finished = SortedLogState.getFinishedMarkerPath(finished); - TabletServer.log.info("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); + for (String log : entry.logSet) { + Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log)); + finished = SortedLogState.getFinishedMarkerPath(finished); + TabletServer.log.info("Looking for " + finished); + if (fs.exists(finished)) { + recovery = finished.getParent(); + break; + } } if (recovery == null) throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry); @@ -2966,9 +3040,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } public Collection<Tablet> getOnlineTablets() { - synchronized (onlineTablets) { - return new ArrayList<Tablet>(onlineTablets.values()); - } + return Collections.unmodifiableCollection(onlineTablets.values()); } public VolumeManager getFileSystem() { @@ -2994,61 +3066,4 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public SecurityOperation getSecurityOperation() { return security; } - - // avoid unnecessary redundant markings to meta - final ConcurrentHashMap<DfsLogger,EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>(); - final Object levelLocks[] = new Object[TabletLevel.values().length]; - { - for (int i = 0; i < levelLocks.length; i++) { - levelLocks[i] = new Object(); - } - } - - // remove any meta entries after a rolled log is no longer referenced - Set<DfsLogger> closedLogs = new HashSet<>(); - - private void markUnusedWALs() { - Set<DfsLogger> candidates; - synchronized (closedLogs) { - candidates = new HashSet<>(closedLogs); - } - for (Tablet tablet : getOnlineTablets()) { - candidates.removeAll(tablet.getCurrentLogFiles()); - } - try { - Set<Path> filenames = new HashSet<>(); - for (DfsLogger candidate : candidates) { - filenames.add(candidate.getPath()); - } - MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames); - synchronized (closedLogs) { - closedLogs.removeAll(candidates); - } - } catch (AccumuloException ex) { - log.info(ex.toString(), ex); - } - } - - public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) { - // serialize the updates to the metadata per level: avoids updating the level more than once - // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs - synchronized (levelLocks[level.ordinal()]) { - EnumSet<TabletLevel> set = null; - set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level)); - if (set == null || !set.contains(level) || level == TabletLevel.ROOT) { - log.info("Writing log marker for level " + level + " " + copy.getFileName()); - MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level); - } - set = metadataTableLogs.get(copy); - set.add(level); - } - } - - public void walogClosed(DfsLogger currentLog) { - metadataTableLogs.remove(currentLog); - synchronized (closedLogs) { - closedLogs.add(currentLog); - } - } - }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index cd7ce08..8512690 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -72,7 +72,7 @@ import com.google.common.base.Optional; * Wrap a connection to a logger. * */ -public class DfsLogger implements Comparable<DfsLogger> { +public class DfsLogger { public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; @@ -371,7 +371,6 @@ public class DfsLogger implements Comparable<DfsLogger> { public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); - log.debug("Address is " + address); String logger = Joiner.on("+").join(address.split(":")); log.debug("DfsLogger.open() begin"); @@ -464,11 +463,7 @@ public class DfsLogger implements Comparable<DfsLogger> { } public String getFileName() { - return logPath; - } - - public Path getPath() { - return new Path(logPath); + return logPath.toString(); } public void close() throws IOException { @@ -614,9 +609,4 @@ public class DfsLogger implements Comparable<DfsLogger> { return Joiner.on(":").join(parts[parts.length - 2].split("[+]")); } - @Override - public int compareTo(DfsLogger o) { - return getFileName().compareTo(o.getFileName()); - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index ab3dea2..37882cd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -180,7 +180,7 @@ public class SortedLogRecovery { // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to while (reader.next(key, value)) { - // log.debug("Event " + key.event + " tablet " + key.tablet); + // LogReader.printEntry(entry); if (key.event != DEFINE_TABLET) break; if (key.tablet.equals(extent) || key.tablet.equals(alternative)) { @@ -209,7 +209,7 @@ public class SortedLogRecovery { if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.COMPLETE; if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); lastStartToFinish.update(fileno, key.seq); // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. @@ -218,7 +218,7 @@ public class SortedLogRecovery { lastStartToFinish.update(-1); } else if (key.event == COMPACTION_FINISH) { if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH; else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart) @@ -249,6 +249,8 @@ public class SortedLogRecovery { break; if (key.tid != tid) break; + // log.info("Replaying " + key); + // log.info(value); if (key.event == MUTATION) { mr.receive(value.mutations.get(0)); } else if (key.event == MANY_MUTATIONS) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 3fb3c86..1d385d9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -21,16 +21,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,9 +37,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; -import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.replication.StatusUtil; @@ -76,22 +72,20 @@ public class TabletServerLogger { private final TabletServer tserver; - // The current logger - private DfsLogger currentLog = null; - private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>(); - private ThreadPoolExecutor nextLogMaker; + // The current log set: always updated to a new set with every change of loggers + private final List<DfsLogger> loggers = new ArrayList<DfsLogger>(); - // The current generation of logs. - // Because multiple threads can be using a log at one time, a log + // The current generation of logSet. + // Because multiple threads can be using a log set at one time, a log // failure is likely to affect multiple threads, who will all attempt to - // create a new log. This will cause many unnecessary updates to the + // create a new logSet. This will cause many unnecessary updates to the // metadata table. // We'll use this generational counter to determine if another thread has - // already fetched a new log. - private final AtomicInteger logId = new AtomicInteger(); + // already fetched a new logSet. + private AtomicInteger logSetId = new AtomicInteger(); // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them - private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock(); private final AtomicInteger seqGen = new AtomicInteger(); @@ -152,74 +146,62 @@ public class TabletServerLogger { this.flushCounter = flushCounter; } - private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { - final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>(); - testLockAndRun(logIdLock, new TestCallWithWriteLock() { + private int initializeLoggers(final List<DfsLogger> copy) throws IOException { + final int[] result = {-1}; + testLockAndRun(logSetLock, new TestCallWithWriteLock() { @Override boolean test() { - result.set(currentLog); - if (currentLog != null) - logIdOut.set(logId.get()); - return currentLog == null; + copy.clear(); + copy.addAll(loggers); + if (!loggers.isEmpty()) + result[0] = logSetId.get(); + return loggers.isEmpty(); } @Override void withWriteLock() throws IOException { try { - createLogger(); - result.set(currentLog); - if (currentLog != null) - logIdOut.set(logId.get()); + createLoggers(); + copy.clear(); + copy.addAll(loggers); + if (copy.size() > 0) + result[0] = logSetId.get(); else - logIdOut.set(-1); + result[0] = -1; } catch (IOException e) { log.error("Unable to create loggers", e); } } }); - return result.get(); + return result[0]; } - /** - * Get the current WAL file - * - * @return The name of the current log, or null if there is no current log. - */ - public String getLogFile() { - logIdLock.readLock().lock(); + public void getLogFiles(Set<String> loggersOut) { + logSetLock.readLock().lock(); try { - if (null == currentLog) { - return null; + for (DfsLogger logger : loggers) { + loggersOut.add(logger.getFileName()); } - return currentLog.getFileName(); } finally { - logIdLock.readLock().unlock(); + logSetLock.readLock().unlock(); } } - synchronized private void createLogger() throws IOException { - if (!logIdLock.isWriteLockedByCurrentThread()) { + synchronized private void createLoggers() throws IOException { + if (!logSetLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("createLoggers should be called with write lock held!"); } - if (currentLog != null) { - throw new IllegalStateException("createLoggers should not be called when current log is set"); + if (loggers.size() != 0) { + throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size()); } try { - startLogMaker(); - Object next = nextLog.take(); - if (next instanceof Exception) { - throw (Exception) next; - } - if (next instanceof DfsLogger) { - currentLog = (DfsLogger) next; - logId.incrementAndGet(); - log.info("Using next log " + currentLog.getFileName()); - return; - } else { - throw new RuntimeException("Error: unexpected type seen: " + next); - } + DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); + alog.open(tserver.getClientAddressString()); + loggers.add(alog); + logSetId.incrementAndGet(); + return; } catch (Exception t) { walErrors.put(System.currentTimeMillis(), ""); if (walErrors.size() >= HALT_AFTER_ERROR_COUNT) { @@ -229,63 +211,22 @@ public class TabletServerLogger { } } - private synchronized void startLogMaker() { - if (nextLogMaker != null) { - return; - } - nextLogMaker = new SimpleThreadPool(1, "WALog creator"); - nextLogMaker.submit(new Runnable() { - @Override - public void run() { - while (!nextLogMaker.isShutdown()) { - try { - log.debug("Creating next WAL"); - DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); - log.debug("Created next WAL " + alog.getFileName()); - while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) { - log.info("Our WAL was not used for 12 hours: " + alog.getFileName()); - } - } catch (Exception t) { - log.error("{}", t.getMessage(), t); - try { - nextLog.offer(t, 12, TimeUnit.HOURS); - } catch (InterruptedException ex) { - // ignore - } - } - } - } - }); - } - - public void resetLoggers() throws IOException { - logIdLock.writeLock().lock(); - try { - close(); - } finally { - logIdLock.writeLock().unlock(); - } - } - synchronized private void close() throws IOException { - if (!logIdLock.isWriteLockedByCurrentThread()) { + if (!logSetLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("close should be called with write lock held!"); } try { - if (null != currentLog) { + for (DfsLogger logger : loggers) { try { - currentLog.close(); + logger.close(); } catch (DfsLogger.LogClosedException ex) { // ignore } catch (Throwable ex) { - log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex); - } finally { - this.tserver.walogClosed(currentLog); + log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex); } - currentLog = null; - logSizeEstimate.set(0); } + loggers.clear(); + logSizeEstimate.set(0); } catch (Throwable t) { throw new IOException(t); } @@ -302,7 +243,7 @@ public class TabletServerLogger { private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException { // Work very hard not to lock this during calls to the outside world - int currentLogId = logId.get(); + int currentLogSet = logSetId.get(); int seq = -1; int attempt = 1; @@ -310,22 +251,20 @@ public class TabletServerLogger { while (!success) { try { // get a reference to the loggers that no other thread can touch - DfsLogger copy = null; - AtomicInteger currentId = new AtomicInteger(-1); - copy = initializeLoggers(currentId); - currentLogId = currentId.get(); + ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>(); + currentLogSet = initializeLoggers(copy); // add the logger to the log set for the memory in the tablet, // update the metadata table if we've never used this tablet - if (currentLogId == logId.get()) { + if (currentLogSet == logSetId.get()) { for (CommitSession commitSession : sessions) { if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { try { // Scribble out a tablet definition and then write to the metadata table defineTablet(commitSession); - if (currentLogId == logId.get()) - tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent())); + if (currentLogSet == logSetId.get()) + tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId()); } finally { commitSession.finishUpdatingLogsUsed(); } @@ -333,29 +272,39 @@ public class TabletServerLogger { // Need to release KeyExtent extent = commitSession.getExtent(); if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) { - Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis()); - log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName()); + Set<String> logs = new HashSet<String>(); + for (DfsLogger logger : copy) { + logs.add(logger.getFileName()); + } + Status status = StatusUtil.fileCreated(System.currentTimeMillis()); + log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs); // Got some new WALs, note this in the metadata table - ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status); + ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status); } } } } // Make sure that the logs haven't changed out from underneath our copy - if (currentLogId == logId.get()) { + if (currentLogSet == logSetId.get()) { // write the mutation to the logs seq = seqGen.incrementAndGet(); if (seq < 0) throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); - LoggerOperation lop = writer.write(copy, seq); - if (lop != null) { + ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size()); + for (DfsLogger wal : copy) { + LoggerOperation lop = writer.write(wal, seq); + if (lop != null) + queuedOperations.add(lop); + } + + for (LoggerOperation lop : queuedOperations) { lop.await(); } // double-check: did the log set change? - success = (currentLogId == logId.get()); + success = (currentLogSet == logSetId.get()); } } catch (DfsLogger.LogClosedException ex) { log.debug("Logs closed while writing, retrying " + attempt); @@ -370,13 +319,13 @@ public class TabletServerLogger { // Some sort of write failure occurred. Grab the write lock and reset the logs. // But since multiple threads will attempt it, only attempt the reset when // the logs haven't changed. - final int finalCurrent = currentLogId; + final int finalCurrent = currentLogSet; if (!success) { - testLockAndRun(logIdLock, new TestCallWithWriteLock() { + testLockAndRun(logSetLock, new TestCallWithWriteLock() { @Override boolean test() { - return finalCurrent == logId.get(); + return finalCurrent == logSetId.get(); } @Override @@ -389,7 +338,7 @@ public class TabletServerLogger { } // if the log gets too big, reset it .. grab the write lock first logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead - testLockAndRun(logIdLock, new TestCallWithWriteLock() { + testLockAndRun(logSetLock, new TestCallWithWriteLock() { @Override boolean test() { return logSizeEstimate.get() > maxSize; http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java index dee705c..d908f1d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.tserver.tablet; +import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.data.Mutation; @@ -85,7 +86,7 @@ public class CommitSession { return committer; } - public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) { return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index ab15ccc..db1b418 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -424,9 +424,7 @@ class DatafileManager { if (log.isDebugEnabled()) { log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly); } - for (String logFile : logFileOnly) { - ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength()); - } + ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength()); } } finally { tablet.finishClearingUnusedLogs(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 17864be..1f4625b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -37,7 +37,6 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -201,7 +200,7 @@ public class Tablet implements TabletCommitter { } // stores info about user initiated major compaction that is waiting on a minor compaction to finish - private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); + private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); static enum CompactionState { WAITING_TO_START, IN_PROGRESS @@ -628,8 +627,8 @@ public class Tablet implements TabletCommitter { // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress. Status status = StatusUtil.openWithUnknownLength(); for (LogEntry logEntry : logEntries) { - log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status)); - ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status); + log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status)); + ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status); } } @@ -641,9 +640,11 @@ public class Tablet implements TabletCommitter { } } // make some closed references that represent the recovered logs - currentLogs = new ConcurrentSkipListSet<DfsLogger>(); + currentLogs = new HashSet<DfsLogger>(); for (LogEntry logEntry : logEntries) { - currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString())); + for (String log : logEntry.logSet) { + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString())); + } } log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries() @@ -934,9 +935,7 @@ public class Tablet implements TabletCommitter { long count = 0; - String oldName = Thread.currentThread().getName(); try { - Thread.currentThread().setName("Minor compacting " + this.extent); Span span = Trace.start("write"); CompactionStats stats; try { @@ -967,7 +966,6 @@ public class Tablet implements TabletCommitter { failed = true; throw new RuntimeException(e); } finally { - Thread.currentThread().setName(oldName); try { getTabletMemory().finalizeMinC(); } catch (Throwable t) { @@ -992,7 +990,7 @@ public class Tablet implements TabletCommitter { private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { CommitSession oldCommitSession = getTabletMemory().prepareForMinC(); otherLogs = currentLogs; - currentLogs = new ConcurrentSkipListSet<DfsLogger>(); + currentLogs = new HashSet<DfsLogger>(); FileRef mergeFile = null; if (mincReason != MinorCompactionReason.RECOVERY) { @@ -2376,11 +2374,14 @@ public class Tablet implements TabletCommitter { } } - private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>(); + private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>(); - // currentLogs may be updated while a tablet is otherwise locked - public Set<DfsLogger> getCurrentLogFiles() { - return new HashSet<DfsLogger>(currentLogs); + public synchronized Set<String> getCurrentLogFiles() { + Set<String> result = new HashSet<String>(); + for (DfsLogger log : currentLogs) { + result.add(log.getFileName()); + } + return result; } Set<String> beginClearingUnusedLogs() { @@ -2439,13 +2440,13 @@ public class Tablet implements TabletCommitter { // this lock is basically used to synchronize writing of log info to metadata private final ReentrantLock logLock = new ReentrantLock(); - public int getLogCount() { + public synchronized int getLogCount() { return currentLogs.size(); } // don't release the lock if this method returns true for success; instead, the caller should clean up by calling finishUpdatingLogsUsed() @Override - public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) { boolean releaseLock = true; @@ -2482,26 +2483,28 @@ public class Tablet implements TabletCommitter { int numAdded = 0; int numContained = 0; - if (addToOther) { - if (otherLogs.add(more)) - numAdded++; + for (DfsLogger logger : more) { + if (addToOther) { + if (otherLogs.add(logger)) + numAdded++; - if (currentLogs.contains(more)) - numContained++; - } else { - if (currentLogs.add(more)) - numAdded++; + if (currentLogs.contains(logger)) + numContained++; + } else { + if (currentLogs.add(logger)) + numAdded++; - if (otherLogs.contains(more)) - numContained++; + if (otherLogs.contains(logger)) + numContained++; + } } - if (numAdded > 0 && numAdded != 1) { + if (numAdded > 0 && numAdded != more.size()) { // expect to add all or none throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs); } - if (numContained > 0 && numContained != 1) { + if (numContained > 0 && numContained != more.size()) { // expect to contain all or none throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java index 934ce20..c7e3a66 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.tserver.tablet; +import java.util.Collection; import java.util.List; import org.apache.accumulo.core.client.Durability; @@ -37,7 +38,7 @@ public interface TabletCommitter { /** * If this method returns true, the caller must call {@link #finishUpdatingLogsUsed()} to clean up */ - boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish); + boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish); void finishUpdatingLogsUsed(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java deleted file mode 100644 index 44058d3..0000000 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.tserver.log; - -import static org.junit.Assert.assertEquals; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -public class LogEntryTest { - - @Test - public void test() throws Exception { - KeyExtent extent = new KeyExtent(new Text("1"), null, new Text("")); - long ts = 12345678L; - String server = "localhost:1234"; - String filename = "default/foo"; - LogEntry entry = new LogEntry(extent, ts, server, filename); - assertEquals(extent, entry.extent); - assertEquals(server, entry.server); - assertEquals(filename, entry.filename); - assertEquals(ts, entry.timestamp); - assertEquals("1<; default/foo", entry.toString()); - assertEquals(new Text("log"), entry.getColumnFamily()); - assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier()); - LogEntry copy = LogEntry.fromBytes(entry.toBytes()); - assertEquals(entry.toString(), copy.toString()); - Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo")); - key.setTimestamp(ts); - LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue()); - assertEquals(entry.toString(), copy2.toString()); - assertEquals(entry.timestamp, copy2.timestamp); - assertEquals("foo", entry.getUniqueID()); - assertEquals("localhost:1234/default/foo", entry.getName()); - assertEquals(new Value("default/foo".getBytes()), entry.getValue()); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 1186c68..d0de29f 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -202,6 +202,9 @@ public class NullTserver { } @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {} + + @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { return new ArrayList<ActiveCompaction>(); } @@ -228,9 +231,6 @@ public class NullTserver { public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { return null; } - - @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { } } static class Opts extends Help { http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java index 53653da..6338e00 100644 --- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java +++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java @@ -60,11 +60,6 @@ import com.google.common.net.HostAndPort; public class ProxyDurabilityIT extends ConfigurableMacIT { @Override - protected int defaultTimeoutSeconds() { - return 60; - } - - @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); @@ -116,7 +111,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT { assertEquals(0, count(tableName)); ConditionalWriterOptions cfg = new ConditionalWriterOptions(); - cfg.setDurability(Durability.SYNC); + cfg.setDurability(Durability.LOG); String cwriter = client.createConditionalWriter(login, tableName, cfg); ConditionalUpdates updates = new ConditionalUpdates(); updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes("")))); @@ -125,7 +120,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT { assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row"))); assertEquals(1, count(tableName)); restartTServer(); - assertEquals(1, count(tableName)); + assertEquals(0, count(tableName)); proxyServer.stop(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java index 0dcdf42..25337b2 100644 --- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -54,7 +54,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT { @Override public int defaultTimeoutSeconds() { - return 120; + return 60; } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/BalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java index 8703f18..f793925 100644 --- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java @@ -20,33 +20,25 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.test.functional.ConfigurableMacIT; import org.apache.hadoop.io.Text; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class BalanceIT extends AccumuloClusterIT { - private static final Logger log = LoggerFactory.getLogger(BalanceIT.class); +public class BalanceIT extends ConfigurableMacIT { - @Override - public int defaultTimeoutSeconds() { - return 60; - } - - @Test + @Test(timeout = 60 * 1000) public void testBalance() throws Exception { String tableName = getUniqueNames(1)[0]; Connector c = getConnector(); - log.info("Creating table"); + System.out.println("Creating table"); c.tableOperations().create(tableName); SortedSet<Text> splits = new TreeSet<Text>(); for (int i = 0; i < 10; i++) { splits.add(new Text("" + i)); } - log.info("Adding splits"); + System.out.println("Adding splits"); c.tableOperations().addSplits(tableName, splits); - log.info("Waiting for balance"); + System.out.println("Waiting for balance"); c.instanceOperations().waitForBalance(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java index 3f9e1cc..d754a14 100644 --- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java +++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java @@ -129,7 +129,6 @@ public class CleanWalIT extends AccumuloClusterIT { private int countLogs(String tableName, Connector conn) throws TableNotFoundException { Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); - scanner.setRange(MetadataSchema.TabletsSection.getRange()); int count = 0; for (Entry<Key,Value> entry : scanner) { log.debug("Saw " + entry.getKey() + "=" + entry.getValue()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java index 65be396..b7637a6 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -1294,7 +1294,6 @@ public class ConditionalWriterIT extends AccumuloClusterIT { conn.tableOperations().create(tableName); DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig()); - UtilWaitThread.sleep(1000); Span root = Trace.on("traceTest"); ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java deleted file mode 100644 index 96ae579..0000000 --- a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import static org.junit.Assert.assertEquals; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.fate.util.UtilWaitThread; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.fs.RemoteIterator; -import org.junit.Test; - -import com.google.common.collect.Iterators; - -public class GarbageCollectWALIT extends ConfigurableMacIT { - - @Override - protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s"); - cfg.setProperty(Property.GC_CYCLE_START, "1s"); - cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); - cfg.setNumTservers(1); - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - @Test(timeout = 2 * 60 * 1000) - public void test() throws Exception { - // not yet, please - String tableName = getUniqueNames(1)[0]; - cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); - Connector c = getConnector(); - c.tableOperations().create(tableName); - // count the number of WALs in the filesystem - assertEquals(2, countWALsInFS(cluster)); - cluster.getClusterControl().stop(ServerType.TABLET_SERVER); - cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR); - cluster.getClusterControl().start(ServerType.TABLET_SERVER); - Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator()); - // let GC run - UtilWaitThread.sleep(3 * 5 * 1000); - assertEquals(2, countWALsInFS(cluster)); - } - - private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception { - FileSystem fs = cluster.getFileSystem(); - RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true); - int result = 0; - while (iterator.hasNext()) { - LocatedFileStatus next = iterator.next(); - if (!next.isDirectory()) { - result++; - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java index 27f1f69..b78a311 100644 --- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.File; +import java.util.Collections; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; @@ -26,7 +27,6 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; @@ -127,7 +127,11 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString()); + LogEntry logEntry = new LogEntry(); + logEntry.server = "127.0.0.1:12345"; + logEntry.filename = emptyWalog.toURI().toString(); + logEntry.tabletId = 10; + logEntry.logSet = Collections.singleton(logEntry.filename); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); @@ -182,7 +186,11 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString()); + LogEntry logEntry = new LogEntry(); + logEntry.server = "127.0.0.1:12345"; + logEntry.filename = partialHeaderWalog.toURI().toString(); + logEntry.tabletId = 10; + logEntry.logSet = Collections.singleton(logEntry.filename); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java new file mode 100644 index 0000000..6a9975c --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.AccumuloClusterIT; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.FunctionalTestUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Verify that a recovery of a log without any mutations removes the log reference +public class NoMutationRecoveryIT extends AccumuloClusterIT { + private static final Logger log = LoggerFactory.getLogger(NoMutationRecoveryIT.class); + + @Override + public int defaultTimeoutSeconds() { + return 10 * 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Before + public void takeTraceTableOffline() throws Exception { + Connector conn = getConnector(); + if (conn.tableOperations().exists("trace")) { + conn.tableOperations().offline("trace", true); + } + } + + @After + public void takeTraceTableOnline() throws Exception { + Connector conn = getConnector(); + if (conn.tableOperations().exists("trace")) { + conn.tableOperations().online("trace", true); + } + } + + public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) { + // comparison, without timestamp + Key akey = a.getKey(); + Key bkey = b.getKey(); + log.info("Comparing {} to {}", akey.toStringNoTruncate(), bkey.toStringNoTruncate()); + return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue()); + } + + @Test + public void test() throws Exception { + Connector conn = getConnector(); + final String table = getUniqueNames(1)[0]; + conn.tableOperations().create(table); + String tableId = conn.tableOperations().tableIdMap().get(table); + + log.info("Created {} with id {}", table, tableId); + + // Add a record to the table + update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes())); + + // Get the WAL reference used by the table we just added the update to + Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME); + + log.info("Log reference in metadata table {} {}", logRef.getKey().toStringNoTruncate(), logRef.getValue()); + + // Flush the record to disk + conn.tableOperations().flush(table, null, null, true); + + Range range = Range.prefix(tableId); + log.info("Fetching WAL references over " + table); + assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, range))); + + // Grant permission to the admin user to write to the Metadata table + conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); + + // Add the wal record back to the metadata table + update(conn, MetadataTable.NAME, logRef); + + // Assert that we can get the bogus update back out again + assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME))); + + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + conn.tableOperations().flush(RootTable.NAME, null, null, true); + + ClusterControl control = cluster.getClusterControl(); + control.stopAllServers(ServerType.TABLET_SERVER); + control.startAllServers(ServerType.TABLET_SERVER); + + // Verify that we can read the original record we wrote + Scanner s = conn.createScanner(table, Authorizations.EMPTY); + int count = 0; + for (Entry<Key,Value> e : s) { + assertEquals(e.getKey().getRow().toString(), "row"); + assertEquals(e.getKey().getColumnFamily().toString(), "cf"); + assertEquals(e.getKey().getColumnQualifier().toString(), "cq"); + assertEquals(e.getValue().toString(), "value"); + count++; + } + assertEquals(1, count); + + // Verify that the bogus log reference we wrote it gone + for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) { + assertFalse("Unexpected found reference to bogus log entry: " + ref.getKey().toStringNoTruncate() + " " + ref.getValue(), equals(ref, logRef)); + } + } + + private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception { + Key k = logRef.getKey(); + update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue()); + } + + private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception { + return getLogRefs(conn, table, new Range()); + } + + private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception { + Scanner s = conn.createScanner(table, Authorizations.EMPTY); + s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + s.setRange(r); + return s; + } + + private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception { + return getLogRefs(conn, table).iterator().next(); + } + + private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception { + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation(row); + m.put(cf, cq, value); + bw.addMutation(m); + bw.close(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 6618a65..f5c211c 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -350,7 +350,7 @@ public class ShellServerIT extends SharedMiniClusterIT { ts.exec("config -t " + table2 + " -np", true, "345M", true); ts.exec("getsplits -t " + table2, true, "row5", true); ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true); - ts.exec("online " + table, true); + ts.exec("onlinetable " + table, true); ts.exec("deletetable -f " + table, true); ts.exec("deletetable -f " + table2, true); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java deleted file mode 100644 index 03d783c..0000000 --- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import static org.junit.Assert.assertEquals; - -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.junit.Test; - -import com.google.common.collect.Iterators; - -// When reviewing the changes for ACCUMULO-3423, kturner suggested -// "tablets will now have log references that contain no data, -// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data. -// It would be useful to have an IT that will test this situation. -public class UnusedWALIT extends ConfigurableMacIT { - - @Override - protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - final long logSize = 1024 * 1024 * 10; - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize)); - cfg.setNumTservers(1); - // use raw local file system so walogs sync and flush will work - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize)); - } - - @Test(timeout = 2 * 60 * 1000) - public void test() throws Exception { - // don't want this bad boy cleaning up walog entries - getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); - - // make two tables - String[] tableNames = getUniqueNames(2); - String bigTable = tableNames[0]; - String lilTable = tableNames[1]; - Connector c = getConnector(); - c.tableOperations().create(bigTable); - c.tableOperations().create(lilTable); - - // put some data in a log that should be replayed for both tables - writeSomeData(c, bigTable, 0, 10, 0, 10); - scanSomeData(c, bigTable, 0, 10, 0, 10); - writeSomeData(c, lilTable, 0, 1, 0, 1); - scanSomeData(c, lilTable, 0, 1, 0, 1); - assertEquals(1, getWALCount(c)); - - // roll the logs by pushing data into bigTable - writeSomeData(c, bigTable, 0, 3000, 0, 1000); - assertEquals(2, getWALCount(c)); - - // put some data in the latest log - writeSomeData(c, lilTable, 1, 10, 0, 10); - scanSomeData(c, lilTable, 1, 10, 0, 10); - - // bounce the tserver - getCluster().getClusterControl().stop(ServerType.TABLET_SERVER); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - - // wait for the metadata table to be online - Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator()); - - // check our two sets of data in different logs - scanSomeData(c, lilTable, 0, 1, 0, 1); - scanSomeData(c, lilTable, 1, 10, 0, 10); - } - - private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception { - Scanner s = c.createScanner(table, Authorizations.EMPTY); - s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount))); - int row = startRow; - int col = startCol; - for (Entry<Key,Value> entry : s) { - assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16)); - assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16)); - if (col == startCol + colCount) { - col = startCol; - row++; - if (row == startRow + rowCount) { - break; - } - } - } - assertEquals(row, startRow + rowCount); - } - - private int getWALCount(Connector c) throws Exception { - Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - s.setRange(CurrentLogsSection.getRange()); - try { - return Iterators.size(s.iterator()); - } finally { - s.close(); - } - } - - private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception { - BatchWriterConfig config = new BatchWriterConfig(); - config.setMaxMemory(10 * 1024 * 1024); - BatchWriter bw = conn.createBatchWriter(table, config); - for (int r = startRow; r < startRow + rowCount; r++) { - Mutation m = new Mutation(Integer.toHexString(r)); - for (int c = startCol; c < startCol + colCount; c++) { - m.put("", Integer.toHexString(c), ""); - } - bw.addMutation(m); - } - bw.close(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/VolumeIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java index 2b24219..d9b9429 100644 --- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java @@ -103,7 +103,6 @@ public class VolumeIT extends ConfigurableMacIT { cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath()); cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost()); cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); @@ -426,21 +425,6 @@ public class VolumeIT extends ConfigurableMacIT { Assert.fail("Unexpected volume " + path); } - Text path = new Text(); - for (String table : new String[] {RootTable.NAME, MetadataTable.NAME}) { - Scanner meta = conn.createScanner(table, Authorizations.EMPTY); - meta.setRange(MetadataSchema.CurrentLogsSection.getRange()); - outer: for (Entry<Key,Value> entry : meta) { - MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); - for (int i = 0; i < paths.length; i++) { - if (path.toString().startsWith(paths[i].toString())) { - continue outer; - } - } - Assert.fail("Unexpected volume " + path); - } - } - // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes - // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18 @@ -451,7 +435,6 @@ public class VolumeIT extends ConfigurableMacIT { } Assert.assertEquals(200, sum); - } @Test http://git-wip-us.apache.org/repos/asf/accumulo/blob/36ca2575/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 75fd4e1..5c3694a 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -57,7 +57,6 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -73,11 +72,9 @@ import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.TestMultiTableIngest; import org.apache.accumulo.test.VerifyIngest; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -88,11 +85,6 @@ import com.google.common.base.Charsets; import com.google.common.collect.Iterators; public class ReadWriteIT extends AccumuloClusterIT { - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); - } - private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class); static final int ROWS = 200000;
