Repository: incubator-tephra Updated Branches: refs/heads/master c12a14618 -> 03dde2b40
(TEPHRA-243) Improve logging for slow log append and fix concurrency issues in transaction log writer This closes #53 from Github. Signed-off-by: anew <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/03dde2b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/03dde2b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/03dde2b4 Branch: refs/heads/master Commit: 03dde2b40a34a534870c65d91aac255feeb4e5af Parents: c12a146 Author: anew <[email protected]> Authored: Fri Sep 8 20:04:38 2017 -0700 Committer: anew <[email protected]> Committed: Wed Sep 13 17:50:38 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/tephra/TxConstants.java | 9 + .../tephra/persist/AbstractTransactionLog.java | 223 ++++++++++++------- .../tephra/persist/HDFSTransactionLog.java | 57 ++--- .../tephra/persist/LocalFileTransactionLog.java | 14 +- .../LocalFileTransactionStateStorage.java | 6 +- .../tephra/persist/TransactionLogWriter.java | 5 + .../tephra/ThriftTransactionSystemTest.java | 7 +- .../tephra/persist/HDFSTransactionLogTest.java | 1 + 8 files changed, 206 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index 3a6b70a..bb4b139 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -384,6 +384,15 @@ public class TxConstants { public static final String NUM_ENTRIES_APPENDED = "count"; public static final String VERSION_KEY = "version"; public static final byte CURRENT_VERSION = 3; + + /** + * Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". + */ + public static final String CFG_SLOW_APPEND_THRESHOLD = "data.tx.log.slow.append.threshold"; + /** + * Default value for the threshold in milli seconds for slow log append warnings. + */ + public static final long DEFAULT_SLOW_APPEND_THRESHOLD = 1000; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java index cf97c92..eba5a1f 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java @@ -19,9 +19,11 @@ package org.apache.tephra.persist; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; +import com.google.common.base.Stopwatch; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.tephra.TxConstants; import org.apache.tephra.metrics.MetricsCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,17 +31,20 @@ import org.slf4j.LoggerFactory; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; /** * Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending * this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}. + * + * It is important to call close() on this class to ensure that all writes are synced and the log files are closed. */ public abstract class AbstractTransactionLog implements TransactionLog { - /** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */ - private static final long SLOW_APPEND_THRESHOLD = 1000L; private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class); @@ -47,22 +52,32 @@ public abstract class AbstractTransactionLog implements TransactionLog { private final MetricsCollector metricsCollector; protected long timestamp; private volatile boolean initialized; + private volatile boolean closing; private volatile boolean closed; - private AtomicLong syncedUpTo = new AtomicLong(); - private List<Entry> pendingWrites = Lists.newLinkedList(); + private long writtenUpTo = 0L; + private volatile long syncedUpTo = 0L; + private final Queue<Entry> pendingWrites = new ConcurrentLinkedQueue<>(); private TransactionLogWriter writer; - public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) { + private int countSinceLastSync = 0; + private long positionBeforeWrite = -1L; + private final Stopwatch stopWatch = new Stopwatch(); + + private final long slowAppendThreshold; + + AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector, Configuration conf) { this.timestamp = timestamp; this.metricsCollector = metricsCollector; + this.slowAppendThreshold = conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, + TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD); } /** - * Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they - * also call {@link HDFSTransactionLog#close()}. + * Initializes the log file, opening a file writer. + * * @throws java.io.IOException If an error is encountered initializing the file writer. */ - public synchronized void init() throws IOException { + private synchronized void init() throws IOException { if (initialized) { return; } @@ -85,105 +100,147 @@ public abstract class AbstractTransactionLog implements TransactionLog { @Override public void append(TransactionEdit edit) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); - } - - // wait for sync to complete - sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } + append(Collections.singletonList(edit)); } @Override public void append(List<TransactionEdit> edits) throws IOException { - long startTime = System.nanoTime(); - synchronized (this) { - ensureAvailable(); - + if (closing) { // or closed, which implies closing + throw new IOException("Log " + getName() + " is closing or already closed, cannot append"); + } + if (!initialized) { + init(); + } + // synchronizing here ensures that elements in the queue are ordered by seq number + synchronized (logSequence) { for (TransactionEdit edit : edits) { - Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit); - - // add to pending edits - append(entry); + pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()), edit)); } } - - // wait for sync to complete + // try to sync all pending edits (competing for this with other threads) sync(); - long durationMillis = (System.nanoTime() - startTime) / 1000000L; - if (durationMillis > SLOW_APPEND_THRESHOLD) { - LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec."); - } } - private void ensureAvailable() throws IOException { - if (closed) { - throw new IOException("Log " + getName() + " is already closed, cannot append!"); - } - if (!initialized) { - init(); + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; } } - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimer(TransactionLogWriter)}. + * + * @throws IOException if the position of the writer cannot be determined */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List<Entry> getPendingWrites() { - synchronized (this) { - List<Entry> save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}. + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimer(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + long bytesWritten = writer.getPosition() - positionBeforeWrite; + if (elapsed >= slowAppendThreshold) { + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } + metricsCollector.histogram("wal.sync.size", countSinceLastSync); + metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync won't exceed max int } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { - return; - } - // prevent writer being dereferenced - tmpWriter = writer; - - List<Entry> currentPending = getPendingWrites(); - if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); + if (pendingWrites.isEmpty()) { + // this expected: close() sets closed to true after syncing all pending writes (including ours) + return; + } + // this should never happen because close() only sets closed=true after syncing. + // but if it should happen, we must fail this call because we don't know whether the edit was persisted + throw new IOException( + "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee that edits were persisted"); } - - // write out all accumulated entries to log. - for (Entry e : currentPending) { - tmpWriter.append(e); - entryCount++; - latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e : currentPending) { + writer.append(e); + } + // sequence are guaranteed to be ascending, so the last one is the greatest + latestSeq = currentPending[currentPending.length - 1].getKey().get(); + writtenUpTo = latestSeq; } } - long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing - if (lastSynced < latestSeq) { - tmpWriter.sync(); - metricsCollector.histogram("wal.sync.size", entryCount); - syncedUpTo.compareAndSet(lastSynced, latestSeq); + // Note: latestSeq is a local variable and syncedUpTo is volatile; hence this is safe without synchronization + if (syncedUpTo >= latestSeq) { + return; + } + synchronized (this) { + // check again - someone else might have synced our edits while we were waiting to synchronize + if (syncedUpTo >= latestSeq) { + return; + } + if (closed) { + // this should never happen because close() only sets closed=true after syncing. + // but if it should happen, we must fail this call because we don't know whether the edit was persisted + throw new IOException(String.format( + "Unexpected state: Writer is closed but there are unsynced edits up to sequence id %d, and writes have " + + "been synced up to sequence id %d. Cannot guarantee that edits are persisted.", latestSeq, syncedUpTo)); + } + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimer(writer); } } @@ -192,6 +249,9 @@ public abstract class AbstractTransactionLog implements TransactionLog { if (closed) { return; } + // prevent other threads from adding more edits to the pending queue + closing = true; + // perform a final sync if any outstanding writes if (!pendingWrites.isEmpty()) { sync(); @@ -251,20 +311,21 @@ public abstract class AbstractTransactionLog implements TransactionLog { } // package private for testing + @SuppressWarnings("deprecation") @Deprecated @VisibleForTesting static class CaskEntry implements Writable { private LongWritable key; private co.cask.tephra.persist.TransactionEdit edit; - // for Writable + @SuppressWarnings("unused") public CaskEntry() { this.key = new LongWritable(); this.edit = new co.cask.tephra.persist.TransactionEdit(); } - public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) { + CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) { this.key = key; this.edit = edit; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java index ba781ac..0d9b235 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLog.java @@ -50,9 +50,9 @@ public class HDFSTransactionLog extends AbstractTransactionLog { * @param hConf HDFS cluster configuration. * @param logPath Path to the log file. */ - public HDFSTransactionLog(final FileSystem fs, final Configuration hConf, - final Path logPath, long timestamp, MetricsCollector metricsCollector) { - super(timestamp, metricsCollector); + HDFSTransactionLog(final FileSystem fs, final Configuration hConf, + final Path logPath, long timestamp, MetricsCollector metricsCollector) { + super(timestamp, metricsCollector, hConf); this.fs = fs; this.hConf = hConf; this.logPath = logPath; @@ -73,7 +73,7 @@ public class HDFSTransactionLog extends AbstractTransactionLog { FileStatus status = fs.getFileStatus(logPath); long length = status.getLen(); - TransactionLogReader reader = null; + TransactionLogReader reader; // check if this file needs to be recovered due to failure // Check for possibly empty file. With appends, currently Hadoop reports a // zero length even if the file has been sync'd. Revisit if HDFS-376 or @@ -82,38 +82,36 @@ public class HDFSTransactionLog extends AbstractTransactionLog { LOG.warn("File " + logPath + " might be still open, length is 0"); } + HDFSUtil hdfsUtil = new HDFSUtil(); + hdfsUtil.recoverFileLease(fs, logPath, hConf); try { - HDFSUtil hdfsUtil = new HDFSUtil(); - hdfsUtil.recoverFileLease(fs, logPath, hConf); - try { - FileStatus newStatus = fs.getFileStatus(logPath); - LOG.info("New file size for " + logPath + " is " + newStatus.getLen()); - SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf); - reader = new HDFSTransactionLogReaderSupplier(fileReader).get(); - } catch (EOFException e) { - if (length <= 0) { - // TODO should we ignore an empty, not-last log file if skip.errors - // is false? Either way, the caller should decide what to do. E.g. - // ignore if this is the last log in sequence. - // TODO is this scenario still possible if the log has been - // recovered (i.e. closed) - LOG.warn("Could not open " + logPath + " for reading. File is empty", e); - return null; - } else { - // EOFException being ignored - return null; - } + FileStatus newStatus = fs.getFileStatus(logPath); + LOG.info("New file size for " + logPath + " is " + newStatus.getLen()); + SequenceFile.Reader fileReader = new SequenceFile.Reader(fs, logPath, hConf); + reader = new HDFSTransactionLogReaderSupplier(fileReader).get(); + } catch (EOFException e) { + if (length <= 0) { + // TODO should we ignore an empty, not-last log file if skip.errors + // is false? Either way, the caller should decide what to do. E.g. + // ignore if this is the last log in sequence. + // TODO is this scenario still possible if the log has been + // recovered (i.e. closed) + LOG.warn("Could not open " + logPath + " for reading. File is empty", e); + return null; + } else { + // EOFException being ignored + return null; } - } catch (IOException e) { - throw e; } return reader; } @VisibleForTesting static final class LogWriter implements TransactionLogWriter { + private final SequenceFile.Writer internalWriter; - public LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException { + + LogWriter(FileSystem fs, Configuration hConf, Path logPath) throws IOException { // TODO: retry a few times to ride over transient failures? SequenceFile.Metadata metadata = new SequenceFile.Metadata(); metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY), @@ -125,6 +123,11 @@ public class HDFSTransactionLog extends AbstractTransactionLog { } @Override + public long getPosition() throws IOException { + return internalWriter.getLength(); + } + + @Override public void append(Entry entry) throws IOException { internalWriter.append(entry.getKey(), entry.getEdit()); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java index d81ba38..416aae7 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionLog.java @@ -18,6 +18,7 @@ package org.apache.tephra.persist; +import org.apache.hadoop.conf.Configuration; import org.apache.tephra.metrics.MetricsCollector; import java.io.BufferedInputStream; @@ -40,8 +41,8 @@ public class LocalFileTransactionLog extends AbstractTransactionLog { * Creates a new transaction log using the given file instance. * @param logFile The log file to use. */ - public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) { - super(timestamp, metricsCollector); + LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector, Configuration conf) { + super(timestamp, metricsCollector, conf); this.logFile = logFile; } @@ -64,12 +65,17 @@ public class LocalFileTransactionLog extends AbstractTransactionLog { private final FileOutputStream fos; private final DataOutputStream out; - public LogWriter(File logFile) throws IOException { + LogWriter(File logFile) throws IOException { this.fos = new FileOutputStream(logFile); this.out = new DataOutputStream(new BufferedOutputStream(fos, LocalFileTransactionStateStorage.BUFFER_SIZE)); } @Override + public long getPosition() throws IOException { + return fos.getChannel().position(); + } + + @Override public void append(Entry entry) throws IOException { entry.write(out); } @@ -97,7 +103,7 @@ public class LocalFileTransactionLog extends AbstractTransactionLog { private final DataInputStream in; private Entry reuseEntry = new Entry(); - public LogReader(File logFile) throws IOException { + LogReader(File logFile) throws IOException { this.fin = new FileInputStream(logFile); this.in = new DataInputStream(new BufferedInputStream(fin, LocalFileTransactionStateStorage.BUFFER_SIZE)); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java index beddbb2..3edb909 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/LocalFileTransactionStateStorage.java @@ -61,6 +61,7 @@ public class LocalFileTransactionStateStorage extends AbstractTransactionStateSt }; private final String configuredSnapshotDir; + private final Configuration conf; private final MetricsCollector metricsCollector; private File snapshotDir; @@ -69,6 +70,7 @@ public class LocalFileTransactionStateStorage extends AbstractTransactionStateSt MetricsCollector metricsCollector) { super(codecProvider); this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR); + this.conf = conf; this.metricsCollector = metricsCollector; } @@ -220,7 +222,7 @@ public class LocalFileTransactionStateStorage extends AbstractTransactionStateSt @Nullable @Override public TransactionLog apply(@Nullable TimestampedFilename input) { - return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector); + return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector, conf); } }); } @@ -229,7 +231,7 @@ public class LocalFileTransactionStateStorage extends AbstractTransactionStateSt public TransactionLog createLog(long timestamp) throws IOException { File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp); LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath()); - return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector); + return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector, conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java index 14893ac..ec69548 100644 --- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java +++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionLogWriter.java @@ -43,6 +43,11 @@ public interface TransactionLogWriter extends Closeable { void commitMarker(int count) throws IOException; /** + * @return the current position in the output. + */ + long getPosition() throws IOException; + + /** * Syncs any pending transaction edits added through {@link #append(AbstractTransactionLog.Entry)}, * but not yet flushed to durable storage. * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java index 757b620..e295b95 100644 --- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tephra.distributed.RetryNTimes; import org.apache.tephra.distributed.RetryStrategy; import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.LocalFileTransactionStateStorage; import org.apache.tephra.persist.TransactionStateStorage; import org.apache.tephra.runtime.ConfigModule; import org.apache.tephra.runtime.DiscoveryModules; @@ -52,6 +52,7 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ThriftTransactionSystemTest extends TransactionSystemTest { @@ -79,6 +80,8 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { // we want to use a retry strategy that lets us query the number of times it retried: conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, CountingRetryStrategyProvider.class.getName()); conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 2); + conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) TimeUnit.DAYS.toSeconds(5)); // very long limit + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, tmpFolder.newFolder().toString()); Injector injector = Guice.createInjector( new ConfigModule(conf), @@ -88,7 +91,7 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest { .with(new AbstractModule() { @Override protected void configure() { - bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON); + bind(TransactionStateStorage.class).to(LocalFileTransactionStateStorage.class).in(Scopes.SINGLETON); bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON); } }), http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/03dde2b4/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java index 7a34e55..f53264b 100644 --- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java @@ -191,6 +191,7 @@ public class HDFSTransactionLogTest { List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount); long timestamp = System.currentTimeMillis(); Configuration configuration = getConfiguration(); + configuration.set(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, "0"); FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration); SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber); AtomicLong logSequence = new AtomicLong();
