http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 0e02f05,0000000..7158ea8 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@@ -1,598 -1,0 +1,598 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + ++import static com.google.common.base.Charsets.UTF_8; +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.crypto.CryptoModule; +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule; +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.tserver.TabletMutations; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * Wrap a connection to a logger. + * + */ +public class DfsLogger { + public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; + public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; + + private static Logger log = Logger.getLogger(DfsLogger.class); + + public static class LogClosedException extends IOException { + private static final long serialVersionUID = 1L; + + public LogClosedException() { + super("LogClosed"); + } + } + + /** + * A well-timed tabletserver failure could result in an incomplete header written to a write-ahead log. This exception is thrown when the header cannot be + * read from a WAL which should only happen when the tserver dies as described. + */ + public static class LogHeaderIncompleteException extends IOException { + private static final long serialVersionUID = 1l; + + public LogHeaderIncompleteException(String msg) { + super(msg); + } + + public LogHeaderIncompleteException(String msg, Throwable cause) { + super(msg, cause); + } + + public LogHeaderIncompleteException(Throwable cause) { + super(cause); + } + } + + public static class DFSLoggerInputStreams { + + private FSDataInputStream originalInput; + private DataInputStream decryptingInputStream; + + public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) { + this.originalInput = originalInput; + this.decryptingInputStream = decryptingInputStream; + } + + public FSDataInputStream getOriginalInput() { + return originalInput; + } + + public void setOriginalInput(FSDataInputStream originalInput) { + this.originalInput = originalInput; + } + + public DataInputStream getDecryptingInputStream() { + return decryptingInputStream; + } + + public void setDecryptingInputStream(DataInputStream decryptingInputStream) { + this.decryptingInputStream = decryptingInputStream; + } + } + + public interface ServerResources { + AccumuloConfiguration getConfiguration(); + + VolumeManager getFileSystem(); + + Set<TServerInstance> getCurrentTServers(); + } + + private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>(); + + private final Object closeLock = new Object(); + + private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null); + + private static final LogFileValue EMPTY = new LogFileValue(); + + private boolean closed = false; + + private class LogSyncingTask implements Runnable { + + @Override + public void run() { + ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>(); + boolean sawClosedMarker = false; + while (!sawClosedMarker) { + work.clear(); + + try { + work.add(workQueue.take()); + } catch (InterruptedException ex) { + continue; + } + workQueue.drainTo(work); + + try { + sync.invoke(logFile); + } catch (Exception ex) { + log.warn("Exception syncing " + ex); + for (DfsLogger.LogWork logWork : work) { + logWork.exception = ex; + } + } + + for (DfsLogger.LogWork logWork : work) + if (logWork == CLOSED_MARKER) + sawClosedMarker = true; + else + logWork.latch.countDown(); + } + } + } + + static class LogWork { + CountDownLatch latch; + volatile Exception exception; + + public LogWork(CountDownLatch latch) { + this.latch = latch; + } + } + + public static class LoggerOperation { + private final LogWork work; + + public LoggerOperation(LogWork work) { + this.work = work; + } + + public void await() throws IOException { + try { + work.latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (work.exception != null) { + if (work.exception instanceof IOException) + throw (IOException) work.exception; + else if (work.exception instanceof RuntimeException) + throw (RuntimeException) work.exception; + else + throw new RuntimeException(work.exception); + } + } + } + + @Override + public boolean equals(Object obj) { + // filename is unique + if (obj == null) + return false; + if (obj instanceof DfsLogger) + return getFileName().equals(((DfsLogger) obj).getFileName()); + return false; + } + + @Override + public int hashCode() { + // filename is unique + return getFileName().hashCode(); + } + + private final ServerResources conf; + private FSDataOutputStream logFile; + private DataOutputStream encryptingLogFile = null; + private Method sync; + private String logPath; + private Daemon syncThread; + + /* Track what's actually in +r/!0 for this logger ref */ + private String metaReference; + + public DfsLogger(ServerResources conf) throws IOException { + this.conf = conf; + } + + /** + * Refernce a pre-existing log file. + * + * @param meta + * the cq for the "log" entry in +r/!0 + */ + public DfsLogger(ServerResources conf, String filename, String meta) throws IOException { + this.conf = conf; + this.logPath = filename; + metaReference = meta; + } + + public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { + FSDataInputStream input = fs.open(path); + DataInputStream decryptingInput = null; + + byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(); + byte[] magicBuffer = new byte[magic.length]; + try { + input.readFully(magicBuffer); + if (Arrays.equals(magicBuffer, magic)) { + // additional parameters it needs from the underlying stream. + String cryptoModuleClassname = input.readUTF(); + CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname); + + // Create the parameters and set the input stream into those parameters + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + params.setEncryptedInputStream(input); + + // Create the plaintext input stream from the encrypted one + params = cryptoModule.getDecryptingInputStream(params); + + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } else { + input.seek(0); + byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); + byte[] magicBufferV2 = new byte[magicV2.length]; + input.readFully(magicBufferV2); + + if (Arrays.equals(magicBufferV2, magicV2)) { + // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class + // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be + // the NullCryptoModule (no crypto) or the DefaultCryptoModule. + + // If it's null, we won't have any parameters whatsoever. First, let's attempt to read + // parameters + Map<String,String> opts = new HashMap<String,String>(); + int count = input.readInt(); + for (int i = 0; i < count; i++) { + String key = input.readUTF(); + String value = input.readUTF(); + opts.put(key, value); + } + + if (opts.size() == 0) { + // NullCryptoModule, we're done + decryptingInput = input; + } else { + + // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory + .getCryptoModule(DefaultCryptoModule.class.getName()); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + + input.seek(0); + input.readFully(magicBufferV2); + params.setEncryptedInputStream(input); + + params = cryptoModule.getDecryptingInputStream(params); + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } + + } else { + + input.seek(0); + decryptingInput = input; + } + + } + } catch (EOFException e) { + log.warn("Got EOFException trying to read WAL header information, assuming the rest of the file (" + path + ") has no data."); + // A TabletServer might have died before the (complete) header was written + throw new LogHeaderIncompleteException(e); + } + + return new DFSLoggerInputStreams(input, decryptingInput); + } + + public synchronized void open(String address) throws IOException { + String filename = UUID.randomUUID().toString(); + String logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); + + log.debug("DfsLogger.open() begin"); + VolumeManager fs = conf.getFileSystem(); + + logPath = fs.choose(ServerConstants.getBaseUris()) + Path.SEPARATOR + ServerConstants.WAL_DIR + Path.SEPARATOR + logger + Path.SEPARATOR + filename; + + metaReference = toString(); + try { + short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); + if (replication == 0) + replication = fs.getDefaultReplication(new Path(logPath)); + long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE); + if (blockSize == 0) + blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1); + if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) + logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); + else + logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); + + String syncMethod = conf.getConfiguration().get(Property.TSERV_WAL_SYNC_METHOD); + try { + // hsync: send data to datanodes and sync the data to disk + sync = logFile.getClass().getMethod(syncMethod); + } catch (Exception ex) { + log.warn("Could not find configured " + syncMethod + " method, trying to fall back to old Hadoop sync method", ex); + + try { + // sync: send data to datanodes + sync = logFile.getClass().getMethod("sync"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // Initialize the crypto operations. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf + .getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); + + // Initialize the log file with a header and the crypto params used to set up this log file. - logFile.write(LOG_FILE_HEADER_V3.getBytes(Constants.UTF8)); ++ logFile.write(LOG_FILE_HEADER_V3.getBytes(UTF_8)); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); + + NoFlushOutputStream nfos = new NoFlushOutputStream(logFile); + params.setPlaintextOutputStream(nfos); + + // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here, + // so that that crypto module can re-read its own parameters. + + logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); + + params = cryptoModule.getEncryptingOutputStream(params); + OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); + + // If the module just kicks back our original stream, then just use it, don't wrap it in + // another data OutputStream. + if (encipheringOutputStream == nfos) { + log.debug("No enciphering, using raw output stream"); + encryptingLogFile = nfos; + } else { + log.debug("Enciphering found, wrapping in DataOutputStream"); + encryptingLogFile = new DataOutputStream(encipheringOutputStream); + } + + LogFileKey key = new LogFileKey(); + key.event = OPEN; + key.tserverSession = filename; + key.filename = filename; + write(key, EMPTY); + sync.invoke(logFile); + log.debug("Got new write-ahead log: " + this); + } catch (Exception ex) { + if (logFile != null) + logFile.close(); + logFile = null; + encryptingLogFile = null; + throw new IOException(ex); + } + + syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask())); + syncThread.setName("Accumulo WALog thread " + toString()); + syncThread.start(); + } + + @Override + public String toString() { + String fileName = getFileName(); + if (fileName.contains(":")) + return getLogger() + "/" + getFileName(); + return fileName; + } + + /** + * get the cq needed to reference this logger's entry in +r/!0 + */ + public String getMeta() { + if (null == metaReference) { + throw new IllegalStateException("logger doesn't have meta reference. " + this); + } + return metaReference; + } + + public String getFileName() { + return logPath.toString(); + } + + public void close() throws IOException { + + synchronized (closeLock) { + if (closed) + return; + // after closed is set to true, nothing else should be added to the queue + // CLOSED_MARKER should be the last thing on the queue, therefore when the + // background thread sees the marker and exits there should be nothing else + // to process... so nothing should be left waiting for the background + // thread to do work + closed = true; + workQueue.add(CLOSED_MARKER); + } + + // wait for background thread to finish before closing log file + if (syncThread != null) { + try { + syncThread.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // expect workq should be empty at this point + if (workQueue.size() != 0) { + log.error("WAL work queue not empty after sync thread exited"); + throw new IllegalStateException("WAL work queue not empty after sync thread exited"); + } + + if (encryptingLogFile != null) + try { + logFile.close(); + } catch (IOException ex) { + log.error(ex); + throw new LogClosedException(); + } + } + + public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException { + // write this log to the METADATA table + final LogFileKey key = new LogFileKey(); + key.event = DEFINE_TABLET; + key.seq = seq; + key.tid = tid; + key.tablet = tablet; + try { + write(key, EMPTY); + sync.invoke(logFile); + } catch (IllegalArgumentException e) { + log.error("Signature of sync method changed. Accumulo is likely incompatible with this version of Hadoop."); + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + log.error("Could not invoke sync method due to permission error."); + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + // Cause is null, or some other checked exception that was added later. + throw new RuntimeException(e); + } + } + } + + private synchronized void write(LogFileKey key, LogFileValue value) throws IOException { + key.write(encryptingLogFile); + value.write(encryptingLogFile); + encryptingLogFile.flush(); + } + + public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException { + return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation)))); + } + + private LoggerOperation logFileData(List<Pair<LogFileKey,LogFileValue>> keys) throws IOException { + DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1)); + synchronized (DfsLogger.this) { + try { + for (Pair<LogFileKey,LogFileValue> pair : keys) { + write(pair.getFirst(), pair.getSecond()); + } + } catch (ClosedChannelException ex) { + throw new LogClosedException(); + } catch (Exception e) { + log.error(e, e); + work.exception = e; + } + } + + synchronized (closeLock) { + // use a different lock for close check so that adding to work queue does not need + // to wait on walog I/O operations + + if (closed) + throw new LogClosedException(); + workQueue.add(work); + } + + return new LoggerOperation(work); + } + + public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException { + List<Pair<LogFileKey,LogFileValue>> data = new ArrayList<Pair<LogFileKey,LogFileValue>>(); + for (TabletMutations tabletMutations : mutations) { + LogFileKey key = new LogFileKey(); + key.event = MANY_MUTATIONS; + key.seq = tabletMutations.getSeq(); + key.tid = tabletMutations.getTid(); + LogFileValue value = new LogFileValue(); + value.mutations = tabletMutations.getMutations(); + data.add(new Pair<LogFileKey,LogFileValue>(key, value)); + } + return logFileData(data); + } + + public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_FINISH; + key.seq = seq; + key.tid = tid; + return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY))); + } + + public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_START; + key.seq = seq; + key.tid = tid; + key.filename = fqfn; + return logFileData(Collections.singletonList(new Pair<LogFileKey,LogFileValue>(key, EMPTY))); + } + + public String getLogger() { + String parts[] = logPath.split("/"); + return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":"); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java index 997f71b,0000000..2c807fd mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java @@@ -1,96 -1,0 +1,97 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.logger; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.server.data.ServerMutation; +import org.apache.hadoop.io.Writable; + +public class LogFileValue implements Writable { + + private static final List<Mutation> empty = Collections.emptyList(); + + public List<Mutation> mutations = empty; + + @Override + public void readFields(DataInput in) throws IOException { + int count = in.readInt(); + mutations = new ArrayList<Mutation>(count); + for (int i = 0; i < count; i++) { + ServerMutation mutation = new ServerMutation(); + mutation.readFields(in); + mutations.add(mutation); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(mutations.size()); + for (Mutation m : mutations) { + m.write(out); + } + } + + public static void print(LogFileValue value) { + System.out.println(value.toString()); + } + + private static String displayLabels(byte[] labels) { - String s = new String(labels, Constants.UTF8); ++ String s = new String(labels, UTF_8); + s = s.replace("&", " & "); + s = s.replace("|", " | "); + return s; + } + + public static String format(LogFileValue lfv, int maxMutations) { + if (lfv.mutations.size() == 0) + return ""; + StringBuilder builder = new StringBuilder(); + builder.append(lfv.mutations.size() + " mutations:\n"); + int i = 0; + for (Mutation m : lfv.mutations) { + if (i++ >= maxMutations) { + builder.append("..."); + break; + } - builder.append(" ").append(new String(m.getRow(), Constants.UTF8)).append("\n"); ++ builder.append(" ").append(new String(m.getRow(), UTF_8)).append("\n"); + for (ColumnUpdate update : m.getUpdates()) { + String value = new String(update.getValue()); - builder.append(" ").append(new String(update.getColumnFamily(), Constants.UTF8)).append(":") - .append(new String(update.getColumnQualifier(), Constants.UTF8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:") ++ builder.append(" ").append(new String(update.getColumnFamily(), UTF_8)).append(":") ++ .append(new String(update.getColumnQualifier(), UTF_8)).append(" ").append(update.hasTimestamp() ? "[user]:" : "[system]:") + .append(update.getTimestamp()).append(" [").append(displayLabels(update.getColumnVisibility())).append("] ") + .append(update.isDeleted() ? "<deleted>" : value).append("\n"); + } + } + return builder.toString(); + } + + @Override + public String toString() { + return format(this, 5); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index c3f4fd0,0000000..29eefc8 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@@ -1,178 -1,0 +1,179 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.logger; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.tserver.log.DfsLogger; +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams; +import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException; +import org.apache.accumulo.tserver.log.MultiReader; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; + +public class LogReader { + private static final Logger log = Logger.getLogger(LogReader.class); + + static class Opts extends Help { + @Parameter(names = "-r", description = "print only mutations associated with the given row") + String row; + @Parameter(names = "-m", description = "limit the number of mutations printed per row") + int maxMutations = 5; + @Parameter(names = "-t", description = "print only mutations that fall within the given key extent") + String extent; + @Parameter(names = "-p", description = "search for a row that matches the given regex") + String regexp; + @Parameter(description = "<logfile> { <logfile> ... }") + List<String> files = new ArrayList<String>(); + } + + /** + * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system. + * + * @param args + * - first argument is the file to print + */ + public static void main(String[] args) throws IOException { + Opts opts = new Opts(); + opts.parseArgs(LogReader.class.getName(), args); + VolumeManager fs = VolumeManagerImpl.get(); + + Matcher rowMatcher = null; + KeyExtent ke = null; + Text row = null; + if (opts.files.isEmpty()) { + new JCommander(opts).usage(); + return; + } + if (opts.row != null) + row = new Text(opts.row); + if (opts.extent != null) { + String sa[] = opts.extent.split(";"); + ke = new KeyExtent(new Text(sa[0]), new Text(sa[1]), new Text(sa[2])); + } + if (opts.regexp != null) { + Pattern pattern = Pattern.compile(opts.regexp); + rowMatcher = pattern.matcher(""); + } + + Set<Integer> tabletIds = new HashSet<Integer>(); + + for (String file : opts.files) { + + Path path = new Path(file); + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + + if (fs.isFile(path)) { + // read log entries from a simple hdfs file + DFSLoggerInputStreams streams; + try { + streams = DfsLogger.readHeaderAndReturnStream(fs, path, ServerConfiguration.getSiteConfiguration()); + } catch (LogHeaderIncompleteException e) { + log.warn("Could not read header for " + path + ". Ignoring..."); + continue; + } + DataInputStream input = streams.getDecryptingInputStream(); + + try { + while (true) { + try { + key.readFields(input); + value.readFields(input); + } catch (EOFException ex) { + break; + } + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } + } finally { + input.close(); + } + } else { + // read the log entries sorted in a map file + MultiReader input = new MultiReader(fs, path); + while (input.next(key, value)) { + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } + } + } + } + + public static void printLogEvent(LogFileKey key, LogFileValue value, Text row, Matcher rowMatcher, KeyExtent ke, Set<Integer> tabletIds, int maxMutations) { + + if (ke != null) { + if (key.event == LogEvents.DEFINE_TABLET) { + if (key.tablet.equals(ke)) { + tabletIds.add(key.tid); + } else { + return; + } + } else if (!tabletIds.contains(key.tid)) { + return; + } + } + + if (row != null || rowMatcher != null) { + if (key.event == LogEvents.MUTATION || key.event == LogEvents.MANY_MUTATIONS) { + boolean found = false; + for (Mutation m : value.mutations) { + if (row != null && new Text(m.getRow()).equals(row)) { + found = true; + break; + } + + if (rowMatcher != null) { - rowMatcher.reset(new String(m.getRow(), Constants.UTF8)); ++ rowMatcher.reset(new String(m.getRow(), UTF_8)); + if (rowMatcher.matches()) { + found = true; + break; + } + } + } + + if (!found) + return; + } else { + return; + } + + } + + System.out.println(key); + System.out.println(LogFileValue.format(value, maxMutations)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapConcurrencyTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapPerformanceTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/NativeMapStressTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java index a679c48,ef6311b..3c20858 --- a/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java +++ b/test/src/main/java/org/apache/accumulo/test/TestBinaryRows.java @@@ -71,98 -72,71 +72,98 @@@ public class TestBinaryRows return l; } - static class Opts extends ClientOnRequiredTable { + public static class Opts extends ClientOnRequiredTable { @Parameter(names="--mode", description="either 'ingest', 'delete', 'randomLookups', 'split', 'verify', 'verifyDeleted'", required=true) - String mode; + public String mode; @Parameter(names="--start", description="the lowest numbered row") - long start = 0; + public long start = 0; @Parameter(names="--count", description="number of rows to ingest", required=true) - long num = 0; + public long num = 0; } - public static void main(String[] args) { - Opts opts = new Opts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - ScannerOpts scanOpts = new ScannerOpts(); - opts.parseArgs(TestBinaryRows.class.getName(), args, scanOpts, bwOpts); + public static void runTest(Connector connector, Opts opts, BatchWriterOpts bwOpts, ScannerOpts scanOpts) throws Exception { - try { - Connector connector = opts.getConnector(); - - final Text CF = new Text("cf"), CQ = new Text("cq"); - final byte[] CF_BYTES = "cf".getBytes(UTF_8), CQ_BYTES = "cq".getBytes(UTF_8); + final Text CF = new Text("cf"), CQ = new Text("cq"); - final byte[] CF_BYTES = "cf".getBytes(Constants.UTF8), CQ_BYTES = "cq".getBytes(Constants.UTF8); ++ final byte[] CF_BYTES = "cf".getBytes(UTF_8), CQ_BYTES = "cq".getBytes(UTF_8); + if (opts.mode.equals("ingest") || opts.mode.equals("delete")) { + BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig()); + boolean delete = opts.mode.equals("delete"); - if (opts.mode.equals("ingest") || opts.mode.equals("delete")) { - BatchWriter bw = connector.createBatchWriter(opts.tableName, bwOpts.getBatchWriterConfig()); - boolean delete = opts.mode.equals("delete"); - - for (long i = 0; i < opts.num; i++) { - byte[] row = encodeLong(i + opts.start); - String value = "" + (i + opts.start); - - Mutation m = new Mutation(new Text(row)); - if (delete) { - m.putDelete(CF, CQ); - } else { - m.put(CF, CQ, new Value(value.getBytes(UTF_8))); - } - bw.addMutation(m); + for (long i = 0; i < opts.num; i++) { + byte[] row = encodeLong(i + opts.start); + String value = "" + (i + opts.start); + + Mutation m = new Mutation(new Text(row)); + if (delete) { + m.putDelete(CF, CQ); + } else { - m.put(CF, CQ, new Value(value.getBytes(Constants.UTF8))); ++ m.put(CF, CQ, new Value(value.getBytes(UTF_8))); } - - bw.close(); - } else if (opts.mode.equals("verifyDeleted")) { - Scanner s = connector.createScanner(opts.tableName, opts.auths); - s.setBatchSize(scanOpts.scanBatchSize); - Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); - Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); - s.setBatchSize(50000); - s.setRange(new Range(startKey, stopKey)); - - for (Entry<Key,Value> entry : s) { - System.err.println("ERROR : saw entries in range that should be deleted ( first value : " + entry.getValue().toString() + ")"); - System.err.println("exiting..."); - System.exit(1); - } - - } else if (opts.mode.equals("verify")) { - long t1 = System.currentTimeMillis(); + bw.addMutation(m); + } + + bw.close(); + } else if (opts.mode.equals("verifyDeleted")) { + Scanner s = connector.createScanner(opts.tableName, opts.auths); + s.setBatchSize(scanOpts.scanBatchSize); + Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); + Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); + s.setBatchSize(50000); + s.setRange(new Range(startKey, stopKey)); + + for (Entry<Key,Value> entry : s) { + throw new Exception("ERROR : saw entries in range that should be deleted ( first value : " + entry.getValue().toString() + ")"); + } + + } else if (opts.mode.equals("verify")) { + long t1 = System.currentTimeMillis(); + + Scanner s = connector.createScanner(opts.tableName, opts.auths); + Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); + Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); + s.setBatchSize(scanOpts.scanBatchSize); + s.setRange(new Range(startKey, stopKey)); + + long i = opts.start; + + for (Entry<Key,Value> e : s) { + Key k = e.getKey(); + Value v = e.getValue(); + + checkKeyValue(i, k, v); + + i++; + } + + if (i != opts.start + opts.num) { + throw new Exception("ERROR : did not see expected number of rows, saw " + (i - opts.start) + " expected " + opts.num); + } + + long t2 = System.currentTimeMillis(); + + System.out.printf("time : %9.2f secs%n", ((t2 - t1) / 1000.0)); + System.out.printf("rate : %9.2f entries/sec%n", opts.num / ((t2 - t1) / 1000.0)); + + } else if (opts.mode.equals("randomLookups")) { + int numLookups = 1000; + + Random r = new Random(); + + long t1 = System.currentTimeMillis(); + + for (int i = 0; i < numLookups; i++) { + long row = ((r.nextLong() & 0x7fffffffffffffffl) % opts.num) + opts.start; Scanner s = connector.createScanner(opts.tableName, opts.auths); - Key startKey = new Key(encodeLong(opts.start), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); - Key stopKey = new Key(encodeLong(opts.start + opts.num - 1), CF_BYTES, CQ_BYTES, new byte[0], 0); s.setBatchSize(scanOpts.scanBatchSize); + Key startKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], Long.MAX_VALUE); + Key stopKey = new Key(encodeLong(row), CF_BYTES, CQ_BYTES, new byte[0], 0); s.setRange(new Range(startKey, stopKey)); - long i = opts.start; + Iterator<Entry<Key,Value>> si = s.iterator(); - for (Entry<Key,Value> e : s) { + if (si.hasNext()) { + Entry<Key,Value> e = si.next(); Key k = e.getKey(); Value v = e.getValue(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestIngest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TestIngest.java index bc3ff05,5d8e268..c3e77af --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@@ -16,16 -16,14 +16,17 @@@ */ package org.apache.accumulo.test; + import static com.google.common.base.Charsets.UTF_8; + +import java.io.IOException; import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.TreeSet; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java index 7a848a3,125ef5f..16f0b3f --- a/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestMultiTableIngest.java @@@ -16,11 -16,11 +16,12 @@@ */ package org.apache.accumulo.test; + import static com.google.common.base.Charsets.UTF_8; + import java.util.ArrayList; +import java.util.List; import java.util.Map.Entry; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.BatchWriterOpts; import org.apache.accumulo.core.cli.ScannerOpts; import org.apache.accumulo.core.client.AccumuloException; @@@ -101,8 -97,8 +102,8 @@@ public class TestMultiTableIngest // populate for (int i = 0; i < opts.count; i++) { - Mutation m = new Mutation(new Text(String.format("%05d", i))); + Mutation m = new Mutation(new Text(String.format("%06d", i))); - m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(Constants.UTF8))); + m.put(new Text("col" + Integer.toString((i % 3) + 1)), new Text("qual"), new Value("junk".getBytes(UTF_8))); b.getBatchWriter(tableNames.get(i % tableNames.size())).addMutation(m); } try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousMoru.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousVerify.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java index 6082ebe,5962220..4f88c1b --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousWalk.java @@@ -26,7 -28,7 +28,6 @@@ import java.util.Map.Entry import java.util.Random; import java.util.zip.CRC32; --import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java index 4bd07ca,6124a3d..11d19c5 --- a/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/UndefinedAnalyzer.java @@@ -82,9 -82,9 +83,9 @@@ public class UndefinedAnalyzer parseLog(log); } } - + private void parseLog(File log) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(log), Constants.UTF8)); + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(log), UTF_8)); String line; TreeMap<Long,Long> tm = null; try { @@@ -173,10 -172,10 +174,10 @@@ SimpleDateFormat sdf = new SimpleDateFormat("dd HH:mm:ss,SSS yyyy MM"); String currentYear = (Calendar.getInstance().get(Calendar.YEAR)) + ""; String currentMonth = (Calendar.getInstance().get(Calendar.MONTH) + 1) + ""; - + for (File masterLog : masterLogs) { - + - BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(masterLog), Constants.UTF8)); + BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(masterLog), UTF_8)); String line; try { while ((line = reader.readLine()) != null) { @@@ -256,11 -252,11 +257,11 @@@ public static void main(String[] args) throws Exception { Opts opts = new Opts(); BatchScannerOpts bsOpts = new BatchScannerOpts(); - opts.parseArgs(UndefinedAnalyzer.class.getName(), args, opts); - + opts.parseArgs(UndefinedAnalyzer.class.getName(), args, bsOpts); + List<UndefinedNode> undefs = new ArrayList<UndefinedNode>(); - + - BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, Constants.UTF8)); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, UTF_8)); String line; while ((line = reader.readLine()) != null) { String[] tokens = line.split("\\s"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index f26c8d7,166e2ad..949fc52 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@@ -16,6 -16,10 +16,8 @@@ */ package org.apache.accumulo.test.functional; + import static com.google.common.base.Charsets.UTF_8; + -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Random; http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java index cb17340,33ef0b5..72efee3 --- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/MetadataBatchScanTest.java @@@ -24,7 -26,7 +26,6 @@@ import java.util.Random import java.util.TreeSet; import java.util.UUID; --import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; @@@ -104,10 -100,10 +105,10 @@@ public class MetadataBatchScanTest String dir = "/t-" + UUID.randomUUID(); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(Constants.UTF8))); - Constants.METADATA_DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(UTF_8))); ++ TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mut, new Value(dir.getBytes(UTF_8))); for (int i = 0; i < 5; i++) { - mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(Constants.UTF8))); - mut.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(UTF_8))); ++ mut.put(DataFileColumnFamily.NAME, new Text(dir + "/00000_0000" + i + ".map"), new Value("10000,1000000".getBytes(UTF_8))); } bw.addMutation(mut); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java index 5756934,0191b50..e4ce3f3 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/Module.java @@@ -41,9 -37,7 +43,8 @@@ import javax.xml.parsers.DocumentBuilde import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.log4j.Level; import org.w3c.dom.Document; import org.w3c.dom.Element; @@@ -201,163 -193,109 +202,163 @@@ public class Module extends Node if (fixture != null) { fixture.setUp(state); } - - Node initNode = getNode(initNodeId); - - boolean test = false; - if (initNode instanceof Test) { - startTimer(initNode); - test = true; - } - initNode.visit(state, getProps(initNodeId)); - if (test) - stopTimer(initNode); - - state.visitedNode(); - // update aliases - Set<String> aliases; - if ((aliases = aliasMap.get(initNodeId)) != null) - for (String alias : aliases) { - ((Alias) nodes.get(alias)).update(initNodeId); - } - - String curNodeId = initNodeId; - int numHops = 0; - long startTime = System.currentTimeMillis() / 1000; - while (true) { - // check if END state was reached - if (curNodeId.equalsIgnoreCase("END")) { - log.debug("reached END state"); - break; - } - // check if maxSec was reached - long curTime = System.currentTimeMillis() / 1000; - if ((curTime - startTime) > maxSec) { - log.debug("reached maxSec(" + maxSec + ")"); - break; - } - // check if maxHops was reached - if (numHops > maxHops) { - log.debug("reached maxHops(" + maxHops + ")"); - break; - } - numHops++; - - if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) { - throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")"); - } - AdjList adj = adjMap.get(curNodeId); - String nextNodeId = adj.randomNeighbor(); - Node nextNode = getNode(nextNodeId); - if (nextNode instanceof Alias) { - nextNodeId = ((Alias) nextNode).getTargetId(); - nextNode = ((Alias) nextNode).get(); + + ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner"); + + try { + Node initNode = getNode(initNodeId); + + boolean test = false; + if (initNode instanceof Test) { + startTimer(initNode); + test = true; } - Properties nodeProps = getProps(nextNodeId); - try { - test = false; - if (nextNode instanceof Test) { - startTimer(nextNode); - test = true; + initNode.visit(state, getProps(initNodeId)); + if (test) + stopTimer(initNode); + + state.visitedNode(); + // update aliases + Set<String> aliases; + if ((aliases = aliasMap.get(initNodeId)) != null) + for (String alias : aliases) { + ((Alias) nodes.get(alias)).update(initNodeId); } - nextNode.visit(state, nodeProps); - if (test) - stopTimer(nextNode); - } catch (Exception e) { - log.debug("Connector belongs to user: " + state.getConnector().whoami()); - log.debug("Exception occured at: " + System.currentTimeMillis()); - log.debug("Properties for node: " + nextNodeId); - for (Entry<Object,Object> entry : nodeProps.entrySet()) { - log.debug(" " + entry.getKey() + ": " + entry.getValue()); + + String curNodeId = initNodeId; + int numHops = 0; + long startTime = System.currentTimeMillis() / 1000; + while (true) { + // check if END state was reached + if (curNodeId.equalsIgnoreCase("END")) { + log.debug("reached END state"); + break; } - log.debug("Overall Properties"); - for (Entry<Object,Object> entry : state.getProperties().entrySet()) { - log.debug(" " + entry.getKey() + ": " + entry.getValue()); + // check if maxSec was reached + long curTime = System.currentTimeMillis() / 1000; + if ((curTime - startTime) > maxSec) { + log.debug("reached maxSec(" + maxSec + ")"); + break; } - log.debug("State information"); - for (String key : new TreeSet<String>(state.getMap().keySet())) { - Object value = state.getMap().get(key); - String logMsg = " " + key + ": "; - if (value == null) - logMsg += "null"; - else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number) - logMsg += value; - else if (value instanceof byte[]) - logMsg += new String((byte[])value, UTF_8); - else if (value instanceof PasswordToken) - logMsg += new String(((PasswordToken) value).getPassword(), UTF_8); - else - logMsg += value.getClass()+ " - " + value; - - log.debug(logMsg); + + // The number of seconds before the test should exit + long secondsRemaining = maxSec - (curTime - startTime); + + // check if maxHops was reached + if (numHops > maxHops) { + log.debug("reached maxHops(" + maxHops + ")"); + break; } - throw new Exception("Error running node " + nextNodeId, e); - } - state.visitedNode(); - - // update aliases - if ((aliases = aliasMap.get(curNodeId)) != null) - for (String alias : aliases) { - ((Alias) nodes.get(alias)).update(curNodeId); + numHops++; + + if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) { + throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")"); + } + AdjList adj = adjMap.get(curNodeId); + String nextNodeId = adj.randomNeighbor(); + final Node nextNode; + Node nextNodeOrAlias = getNode(nextNodeId); + if (nextNodeOrAlias instanceof Alias) { + nextNodeId = ((Alias) nextNodeOrAlias).getTargetId(); + nextNode = ((Alias) nextNodeOrAlias).get(); + } else { + nextNode = nextNodeOrAlias; + } + final Properties nodeProps = getProps(nextNodeId); + try { + test = false; + if (nextNode instanceof Test) { + startTimer(nextNode); + test = true; + } + + // Wrap the visit of the next node in the module in a callable that returns a thrown exception + FutureTask<Exception> task = new FutureTask<Exception>(new Callable<Exception>() { + + @Override + public Exception call() throws Exception { + try { + nextNode.visit(state, nodeProps); + return null; + } catch (Exception e) { + return e; + } + } + + }); + + // Run the task (should execute immediately) + service.submit(task); + + Exception nodeException; + try { + // Bound the time we'll wait for the node to complete + nodeException = task.get(secondsRemaining, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e); + break; + } catch (ExecutionException e) { + log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e); + throw e; + } catch (TimeoutException e) { + log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e); + break; + } + + // The RandomWalk node throw an Exception that that Callable handed back + // Throw it and let the Module perform cleanup + if (null != nodeException) { + throw nodeException; + } + + if (test) + stopTimer(nextNode); + } catch (Exception e) { + log.debug("Connector belongs to user: " + state.getConnector().whoami()); + log.debug("Exception occured at: " + System.currentTimeMillis()); + log.debug("Properties for node: " + nextNodeId); + for (Entry<Object,Object> entry : nodeProps.entrySet()) { + log.debug(" " + entry.getKey() + ": " + entry.getValue()); + } + log.debug("Overall Properties"); + for (Entry<Object,Object> entry : state.getProperties().entrySet()) { + log.debug(" " + entry.getKey() + ": " + entry.getValue()); + } + log.debug("State information"); + for (String key : new TreeSet<String>(state.getMap().keySet())) { + Object value = state.getMap().get(key); + String logMsg = " " + key + ": "; + if (value == null) + logMsg += "null"; + else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number) + logMsg += value; + else if (value instanceof byte[]) - logMsg += new String((byte[]) value, Constants.UTF8); ++ logMsg += new String((byte[]) value, UTF_8); + else if (value instanceof PasswordToken) - logMsg += new String(((PasswordToken) value).getPassword(), Constants.UTF8); ++ logMsg += new String(((PasswordToken) value).getPassword(), UTF_8); + else + logMsg += value.getClass() + " - " + value; + + log.debug(logMsg); + } + throw new Exception("Error running node " + nextNodeId, e); } - - curNodeId = nextNodeId; + state.visitedNode(); + + // update aliases + if ((aliases = aliasMap.get(curNodeId)) != null) + for (String alias : aliases) { + ((Alias) nodes.get(alias)).update(curNodeId); + } + + curNodeId = nextNodeId; + } + } finally { + if (null != service) { + service.shutdownNow(); + } } - + if (teardown && (fixture != null)) { log.debug("tearing down module"); fixture.tearDown(state); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java index 1704e49,05cc4f0..67438d9 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkMinusOne.java @@@ -20,10 -21,10 +21,10 @@@ import static com.google.common.base.Ch import org.apache.accumulo.core.data.Value; import org.apache.accumulo.test.randomwalk.State; -public class BulkMinusOne extends BulkTest { - +public class BulkMinusOne extends BulkImportTest { + - private static final Value negOne = new Value("-1".getBytes(Constants.UTF8)); + private static final Value negOne = new Value("-1".getBytes(UTF_8)); - + @Override protected void runLater(State state) throws Exception { log.info("Decrementing"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java index f002274,7f9a218..3dcff6e --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BatchWrite.java @@@ -51,10 -52,10 +52,10 @@@ public class BatchWrite extends Test try { int numRows = rand.nextInt(100000); for (int i = 0; i < numRows; i++) { - Mutation m = new Mutation(String.format("%016x", (rand.nextLong() & 0x7fffffffffffffffl))); - long val = (rand.nextLong() & 0x7fffffffffffffffl); + Mutation m = new Mutation(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl)); + long val = rand.nextLong() & 0x7fffffffffffffffl; for (int j = 0; j < 10; j++) { - m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(Constants.UTF8))); + m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(UTF_8))); } bw.addMutation(m); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java index c8da3d8,c227a11..b3519b6 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/StopTabletServer.java @@@ -47,8 -50,8 +49,8 @@@ public class StopTabletServer extends T Collections.sort(children); Stat stat = new Stat(); byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat); - if (!"master".equals(new String(data, Constants.UTF8))) { + if (!"master".equals(new String(data, UTF_8))) { - result.add(new TServerInstance(AddressUtil.parseAddress(child, Property.TSERV_CLIENTPORT), stat.getEphemeralOwner())); + result.add(new TServerInstance(AddressUtil.parseAddress(child, false), stat.getEphemeralOwner())); } } } catch (KeeperException.NoNodeException ex) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/image/Write.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/security/SecurityHelper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/sequential/Write.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/main/java/org/apache/accumulo/test/randomwalk/shard/BulkInsert.java ----------------------------------------------------------------------