http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java new file mode 100644 index 0000000..cfefc83 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java @@ -0,0 +1,1398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractService; +import com.google.inject.Inject; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.metrics.DefaultMetricsCollector; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.persist.NoOpTransactionStateStorage; +import org.apache.tephra.persist.TransactionEdit; +import org.apache.tephra.persist.TransactionLog; +import org.apache.tephra.persist.TransactionLogReader; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * This is the central place to manage all active transactions in the system. + * + * A transaction consists of + * <ul> + * <li>A write pointer: This is the version used for all writes of that transaction.</li> + * <li>A read pointer: All reads under the transaction use this as an upper bound for the version.</li> + * <li>A set of excluded versions: These are the write versions of other transactions that must be excluded from + * reads, because those transactions are still in progress, or they failed but couldn't be properly rolled back.</li> + * </ul> + * To use the transaction system, a client must follow this sequence of steps: + * <ol> + * <li>Request a new transaction.</li> + * <li>Use the transaction to read and write datasets. Datasets are encouraged to cache the writes of the + * transaction in memory, to reduce the cost of rollback in case the transaction fails. </li> + * <li>Check whether the transaction has conflicts. For this, the set of change keys are submitted via canCommit(), + * and the transaction manager verifies that none of these keys are in conflict with other transactions that + * committed since the start of this transaction.</li> + * <li>If the transaction has conflicts: + * <ol> + * <li>Roll back the changes in every dataset that was changed. This can happen in-memory if the + * changes were cached.</li> + * <li>Abort the transaction to remove it from the active transactions.</li> + * </ol> + * <li>If the transaction has no conflicts:</li> + * <ol> + * <li>Persist all datasets changes to storage.</li> + * <li>Commit the transaction. This will repeat the conflict detection, because more overlapping transactions + * may have committed since the first conflict check.</li> + * <li>If the transaction has conflicts:</li> + * <ol> + * <li>Roll back the changes in every dataset that was changed. This is more expensive because + * changes must be undone in persistent storage.</li> + * <li>Abort the transaction to remove it from the active transactions.</li> + * </ol> + * </ol> + * </ol> + * Transactions may be short or long-running. A short transaction is started with a timeout, and if it is not + * committed before that timeout, it is invalidated and excluded from future reads. A long-running transaction has + * no timeout and will remain active until it is committed or aborted. Long transactions are typically used in + * map/reduce jobs and can produce enormous amounts of changes. Therefore, long transactions do not participate in + * conflict detection (they would almost always have conflicts). We also assume that the changes of long transactions + * are not tracked, and therefore cannot be rolled back. Hence, when a long transaction is aborted, it remains in the + * list of excluded transactions to make its writes invisible. + */ +public class TransactionManager extends AbstractService { + // todo: optimize heavily + + private static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class); + + // poll every 1 second to check whether a snapshot is needed + private static final long SNAPSHOT_POLL_INTERVAL = 1000L; + + //poll every 10 second to emit metrics + private static final long METRICS_POLL_INTERVAL = 10000L; + + private static final long[] NO_INVALID_TX = { }; + + // Transactions that are in progress, with their info. + private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>(); + + // the list of transactions that are invalid (not properly committed/aborted, or timed out) + // TODO: explain usage of two arrays + private final LongArrayList invalid = new LongArrayList(); + private long[] invalidArray = NO_INVALID_TX; + + // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil) + // todo: should this be consolidated with inProgress? + // commit time next writePointer -> changes made by this tx + private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = + new ConcurrentSkipListMap<Long, Set<ChangeId>>(); + // not committed yet + private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap(); + + private long readPointer; + private long lastWritePointer; + private MetricsCollector txMetricsCollector; + + private final TransactionStateStorage persistor; + + private final int cleanupInterval; + private final int defaultTimeout; + private final int defaultLongTimeout; + private DaemonThreadExecutor cleanupThread = null; + + private volatile TransactionLog currentLog; + + // timestamp of the last completed snapshot + private long lastSnapshotTime; + // frequency in millis to perform snapshots + private final long snapshotFrequencyInSeconds; + // number of most recent snapshots to retain + private final int snapshotRetainCount; + private DaemonThreadExecutor snapshotThread; + private DaemonThreadExecutor metricsThread; + + // lock guarding change of the current transaction log + private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); + private final Lock logReadLock = logLock.readLock(); + private final Lock logWriteLock = logLock.writeLock(); + + // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration + // TODO: REMOVE WITH txnBackwardsCompatCheck() + private final long longTimeoutTolerance; + + public TransactionManager(Configuration config) { + this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new DefaultMetricsCollector()); + } + + @Inject + public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage persistor, + MetricsCollector txMetricsCollector) { + this.persistor = persistor; + cleanupInterval = conf.getInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, + TxConstants.Manager.DEFAULT_TX_CLEANUP_INTERVAL); + defaultTimeout = conf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT, + TxConstants.Manager.DEFAULT_TX_TIMEOUT); + defaultLongTimeout = conf.getInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, + TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT); + snapshotFrequencyInSeconds = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, + TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL); + // must always keep at least 1 snapshot + snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN, + TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1); + + // intentionally not using a constant, as this config should not be exposed + // TODO: REMOVE WITH txnBackwardsCompatCheck() + longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000); + + // + this.txMetricsCollector = txMetricsCollector; + this.txMetricsCollector.configure(conf); + clear(); + } + + private void clear() { + invalid.clear(); + invalidArray = NO_INVALID_TX; + inProgress.clear(); + committedChangeSets.clear(); + committingChangeSets.clear(); + lastWritePointer = 0; + readPointer = 0; + lastSnapshotTime = 0; + } + + private boolean isStopping() { + return State.STOPPING.equals(state()); + } + + @Override + public synchronized void doStart() { + LOG.info("Starting transaction manager."); + txMetricsCollector.start(); + // start up the persistor + persistor.startAndWait(); + try { + persistor.setupStorage(); + } catch (IOException e) { + Throwables.propagate(e); + } + // establish defaults in case there is no persistence + clear(); + // attempt to recover state from last run + recoverState(); + // start the periodic cleanup thread + startCleanupThread(); + startSnapshotThread(); + startMetricsThread(); + // initialize the WAL if we did not force a snapshot in recoverState() + initLog(); + // initialize next write pointer if needed + if (lastWritePointer == 0) { + lastWritePointer = getNextWritePointer(); + readPointer = lastWritePointer; + } + + notifyStarted(); + } + + private void initLog() { + if (currentLog == null) { + try { + currentLog = persistor.createLog(System.currentTimeMillis()); + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } + } + } + + private void startCleanupThread() { + if (cleanupInterval <= 0 || defaultTimeout <= 0) { + return; + } + LOG.info("Starting periodic timed-out transaction cleanup every " + cleanupInterval + + " seconds with default timeout of " + defaultTimeout + " seconds."); + this.cleanupThread = new DaemonThreadExecutor("tx-clean-timeout") { + @Override + public void doRun() { + cleanupTimedOutTransactions(); + } + + @Override + public long getSleepMillis() { + return cleanupInterval * 1000; + } + }; + cleanupThread.start(); + } + + private void startSnapshotThread() { + if (snapshotFrequencyInSeconds > 0) { + LOG.info("Starting periodic snapshot thread, frequency = " + snapshotFrequencyInSeconds + + " seconds, location = " + persistor.getLocation()); + this.snapshotThread = new DaemonThreadExecutor("tx-snapshot") { + @Override + public void doRun() { + long currentTime = System.currentTimeMillis(); + if (lastSnapshotTime < (currentTime - snapshotFrequencyInSeconds * 1000)) { + try { + doSnapshot(false); + } catch (IOException ioe) { + LOG.error("Periodic snapshot failed!", ioe); + } + } + } + + @Override + protected void onShutdown() { + // perform a final snapshot + try { + LOG.info("Writing final snapshot prior to shutdown"); + doSnapshot(true); + } catch (IOException ioe) { + LOG.error("Failed performing final snapshot on shutdown", ioe); + } + } + + @Override + public long getSleepMillis() { + return SNAPSHOT_POLL_INTERVAL; + } + }; + snapshotThread.start(); + } + } + + // Emits Transaction Data structures size as metrics + private void startMetricsThread() { + LOG.info("Starting periodic Metrics Emitter thread, frequency = " + METRICS_POLL_INTERVAL); + this.metricsThread = new DaemonThreadExecutor("tx-metrics") { + @Override + public void doRun() { + txMetricsCollector.gauge("committing.size", committingChangeSets.size()); + txMetricsCollector.gauge("committed.size", committedChangeSets.size()); + txMetricsCollector.gauge("inprogress.size", inProgress.size()); + txMetricsCollector.gauge("invalid.size", invalidArray.length); + } + + @Override + protected void onShutdown() { + // perform a final metrics emit + txMetricsCollector.gauge("committing.size", committingChangeSets.size()); + txMetricsCollector.gauge("committed.size", committedChangeSets.size()); + txMetricsCollector.gauge("inprogress.size", inProgress.size()); + txMetricsCollector.gauge("invalid.size", invalidArray.length); + } + + @Override + public long getSleepMillis() { + return METRICS_POLL_INTERVAL; + } + }; + metricsThread.start(); + } + + private void cleanupTimedOutTransactions() { + List<TransactionEdit> invalidEdits = null; + this.logReadLock.lock(); + try { + synchronized (this) { + if (!isRunning()) { + return; + } + + long currentTime = System.currentTimeMillis(); + List<Long> timedOut = Lists.newArrayList(); + for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) { + long expiration = tx.getValue().getExpiration(); + if (expiration >= 0L && currentTime > expiration) { + // timed out, remember tx id (can't remove while iterating over entries) + timedOut.add(tx.getKey()); + LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey()); + } else if (expiration < 0) { + LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " + + "migrated correctly, this transaction will be expired immediately", + tx.getKey(), expiration); + timedOut.add(tx.getKey()); + } + } + if (!timedOut.isEmpty()) { + invalidEdits = Lists.newArrayListWithCapacity(timedOut.size()); + invalid.addAll(timedOut); + for (long tx : timedOut) { + committingChangeSets.remove(tx); + inProgress.remove(tx); + invalidEdits.add(TransactionEdit.createInvalid(tx)); + } + + // todo: find a more efficient way to keep this sorted. Could it just be an array? + Collections.sort(invalid); + invalidArray = invalid.toLongArray(); + LOG.info("Invalidated {} transactions due to timeout.", timedOut.size()); + } + } + if (invalidEdits != null) { + appendToLog(invalidEdits); + } + } finally { + this.logReadLock.unlock(); + } + } + + public synchronized TransactionSnapshot getSnapshot() throws IOException { + TransactionSnapshot snapshot = null; + if (!isRunning() && !isStopping()) { + return null; + } + + long now = System.currentTimeMillis(); + // avoid duplicate snapshots at same timestamp + if (now == lastSnapshotTime || (currentLog != null && now == currentLog.getTimestamp())) { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException ie) { } + } + // copy in memory state + snapshot = getCurrentState(); + + LOG.debug("Starting snapshot of transaction state with timestamp {}", snapshot.getTimestamp()); + LOG.debug("Returning snapshot of state: " + snapshot); + return snapshot; + } + + /** + * Take a snapshot of the transaction state and serialize it into the given output stream. + * @return whether a snapshot was taken. + */ + public boolean takeSnapshot(OutputStream out) throws IOException { + TransactionSnapshot snapshot = getSnapshot(); + if (snapshot != null) { + persistor.writeSnapshot(out, snapshot); + return true; + } else { + return false; + } + } + + private void doSnapshot(boolean closing) throws IOException { + long snapshotTime = 0L; + TransactionSnapshot snapshot = null; + TransactionLog oldLog = null; + try { + this.logWriteLock.lock(); + try { + synchronized (this) { + snapshot = getSnapshot(); + if (snapshot == null && !closing) { + return; + } + if (snapshot != null) { + snapshotTime = snapshot.getTimestamp(); + } + + // roll WAL + oldLog = currentLog; + if (!closing) { + currentLog = persistor.createLog(snapshot.getTimestamp()); + } + } + // there may not be an existing log on startup + if (oldLog != null) { + oldLog.close(); + } + } finally { + this.logWriteLock.unlock(); + } + + // save snapshot + if (snapshot != null) { + persistor.writeSnapshot(snapshot); + lastSnapshotTime = snapshotTime; + + // clean any obsoleted snapshots and WALs + long oldestRetainedTimestamp = persistor.deleteOldSnapshots(snapshotRetainCount); + persistor.deleteLogsOlderThan(oldestRetainedTimestamp); + } + } catch (IOException ioe) { + abortService("Snapshot (timestamp " + snapshotTime + ") failed due to: " + ioe.getMessage(), ioe); + } + } + + public synchronized TransactionSnapshot getCurrentState() { + return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer, + invalid, inProgress, committingChangeSets, committedChangeSets); + } + + public synchronized void recoverState() { + try { + TransactionSnapshot lastSnapshot = persistor.getLatestSnapshot(); + // if we failed before a snapshot could complete, we might not have one to restore + if (lastSnapshot != null) { + restoreSnapshot(lastSnapshot); + } + // replay any WALs since the last snapshot + Collection<TransactionLog> logs = persistor.getLogsSince(lastSnapshotTime); + if (logs != null) { + replayLogs(logs); + } + } catch (IOException e) { + LOG.error("Unable to read back transaction state:", e); + throw Throwables.propagate(e); + } + } + + /** + * Restore the initial in-memory transaction state from a snapshot. + */ + private void restoreSnapshot(TransactionSnapshot snapshot) { + LOG.info("Restoring transaction state from snapshot at " + snapshot.getTimestamp()); + Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!"); + Preconditions.checkState(readPointer == 0, "readPointer has been set!"); + Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!"); + Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!"); + Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!"); + Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should be empty!"); + Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be empty!"); + LOG.info("Restoring snapshot of state: " + snapshot); + + lastSnapshotTime = snapshot.getTimestamp(); + readPointer = snapshot.getReadPointer(); + lastWritePointer = snapshot.getWritePointer(); + invalid.addAll(snapshot.getInvalid()); + inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress())); + committingChangeSets.putAll(snapshot.getCommittingChangeSets()); + committedChangeSets.putAll(snapshot.getCommittedChangeSets()); + } + + /** + * Check if in-progress transactions need to be migrated to have expiration time and type, if so do the migration. + * This is required for backwards compatibility, when long running transactions were represented + * with expiration time -1. This can be removed when we stop supporting SnapshotCodec version 1. + */ + public static Map<Long, InProgressTx> txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance, + Map<Long, InProgressTx> inProgress) { + for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) { + long writePointer = entry.getKey(); + long expiration = entry.getValue().getExpiration(); + // LONG transactions will either have a negative expiration or expiration set to the long timeout + // use a fudge factor on the expiration check, since expiraton is set based on system time, not the write pointer + if (entry.getValue().getType() == null && + (expiration < 0 || + (getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration + < longTimeoutTolerance))) { + // handle null expiration + long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout); + InProgressTx compatTx = + new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG, + entry.getValue().getCheckpointWritePointers()); + entry.setValue(compatTx); + } else if (entry.getValue().getType() == null) { + InProgressTx compatTx = + new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(), + TransactionType.SHORT, entry.getValue().getCheckpointWritePointers()); + entry.setValue(compatTx); + } + } + return inProgress; + } + + /** + * Resets the state of the transaction manager. + */ + public void resetState() { + this.logWriteLock.lock(); + try { + // Take a snapshot before resetting the state, for debugging purposes + doSnapshot(false); + // Clear the state + clear(); + // Take another snapshot: if no snapshot is taken after clearing the state + // and the manager is restarted, we will recover from the snapshot taken + // before resetting the state, which would be really bad + // This call will also init a new WAL + doSnapshot(false); + } catch (IOException e) { + LOG.error("Snapshot failed when resetting state!", e); + e.printStackTrace(); + } finally { + this.logWriteLock.unlock(); + } + } + + /** + * Replay all logged edits from the given transaction logs. + */ + private void replayLogs(Collection<TransactionLog> logs) { + for (TransactionLog log : logs) { + LOG.info("Replaying edits from transaction log " + log.getName()); + int editCnt = 0; + try { + TransactionLogReader reader = log.getReader(); + // reader may be null in the case of an empty file + if (reader == null) { + continue; + } + TransactionEdit edit = null; + while ((edit = reader.next()) != null) { + editCnt++; + switch (edit.getState()) { + case INPROGRESS: + long expiration = edit.getExpiration(); + TransactionType type = edit.getType(); + // Check if transaction needs to be migrated to have expiration and type. Previous version of + // long running transactions were represented with expiration time as -1. + // This can be removed when we stop supporting TransactionEditCodecV2. + if (expiration < 0) { + expiration = getTxExpirationFromWritePointer(edit.getWritePointer(), defaultLongTimeout); + type = TransactionType.LONG; + } else if (type == null) { + type = TransactionType.SHORT; + } + addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), + expiration, type); + break; + case COMMITTING: + addCommittingChangeSet(edit.getWritePointer(), edit.getChanges()); + break; + case COMMITTED: + // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140 + long transactionId = edit.getWritePointer(); + long[] checkpointPointers = edit.getCheckpointPointers(); + long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ? + transactionId : checkpointPointers[checkpointPointers.length - 1]; + doCommit(transactionId, writePointer, edit.getChanges(), + edit.getCommitPointer(), edit.getCanCommit()); + break; + case INVALID: + doInvalidate(edit.getWritePointer()); + break; + case ABORTED: + type = edit.getType(); + // Check if transaction edit needs to be migrated to have type. Previous versions of + // ABORTED edits did not contain type. + // This can be removed when we stop supporting TransactionEditCodecV2. + if (type == null) { + InProgressTx inProgressTx = inProgress.get(edit.getWritePointer()); + if (inProgressTx != null) { + type = inProgressTx.getType(); + } else { + // If transaction is not in-progress, then it has either been already aborted or invalidated. + // We cannot determine the transaction's state based on current information, to be safe invalidate it. + LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay", + edit.getWritePointer()); + doInvalidate(edit.getWritePointer()); + break; + } + } + doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type); + break; + case TRUNCATE_INVALID_TX: + if (edit.getTruncateInvalidTxTime() != 0) { + doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime()); + } else { + doTruncateInvalidTx(edit.getTruncateInvalidTx()); + } + break; + case CHECKPOINT: + doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer()); + break; + default: + // unknown type! + throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState()); + } + } + } catch (IOException ioe) { + throw Throwables.propagate(ioe); + } catch (InvalidTruncateTimeException e) { + throw Throwables.propagate(e); + } + LOG.info("Read " + editCnt + " edits from log " + log.getName()); + } + } + + @Override + public void doStop() { + Stopwatch timer = new Stopwatch().start(); + LOG.info("Shutting down gracefully..."); + // signal the cleanup thread to stop + if (cleanupThread != null) { + cleanupThread.shutdown(); + try { + cleanupThread.join(30000L); + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for cleanup thread to stop"); + Thread.currentThread().interrupt(); + } + } + if (metricsThread != null) { + metricsThread.shutdown(); + try { + metricsThread.join(30000L); + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for cleanup thread to stop"); + Thread.currentThread().interrupt(); + } + } + if (snapshotThread != null) { + // this will trigger a final snapshot on stop + snapshotThread.shutdown(); + try { + snapshotThread.join(30000L); + } catch (InterruptedException ie) { + LOG.warn("Interrupted waiting for snapshot thread to stop"); + Thread.currentThread().interrupt(); + } + } + + persistor.stopAndWait(); + txMetricsCollector.stop(); + timer.stop(); + LOG.info("Took " + timer + " to stop"); + notifyStopped(); + } + + /** + * Immediately shuts down the service, without going through the normal close process. + * @param message A message describing the source of the failure. + * @param error Any exception that caused the failure. + */ + private void abortService(String message, Throwable error) { + if (isRunning()) { + LOG.error("Aborting transaction manager due to: " + message, error); + notifyFailed(error); + } + } + + private void ensureAvailable() { + Preconditions.checkState(isRunning(), "Transaction Manager is not running."); + } + + /** + * Start a short transaction with the default timeout. + */ + public Transaction startShort() { + return startShort(defaultTimeout); + } + + /** + * Start a short transaction with a given timeout. + * @param timeoutInSeconds the time out period in seconds. + */ + public Transaction startShort(int timeoutInSeconds) { + Preconditions.checkArgument(timeoutInSeconds > 0, "timeout must be positive but is %s", timeoutInSeconds); + txMetricsCollector.rate("start.short"); + Stopwatch timer = new Stopwatch().start(); + long expiration = getTxExpiration(timeoutInSeconds); + Transaction tx = startTx(expiration, TransactionType.SHORT); + txMetricsCollector.histogram("start.short.latency", (int) timer.elapsedMillis()); + return tx; + } + + private static long getTxExpiration(long timeoutInSeconds) { + long currentTime = System.currentTimeMillis(); + return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds); + } + + public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) { + return writePointer / TxConstants.MAX_TX_PER_MS + TimeUnit.SECONDS.toMillis(timeoutInSeconds); + } + + private long getNextWritePointer() { + // We want to align tx ids with current time. We assume that tx ids are sequential, but not less than + // System.currentTimeMillis() * MAX_TX_PER_MS. + return Math.max(lastWritePointer + 1, System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS); + } + + /** + * Start a long transaction. Long transactions and do not participate in conflict detection. Also, aborting a long + * transaction moves it to the invalid list because we assume that its writes cannot be rolled back. + */ + public Transaction startLong() { + txMetricsCollector.rate("start.long"); + Stopwatch timer = new Stopwatch().start(); + long expiration = getTxExpiration(defaultLongTimeout); + Transaction tx = startTx(expiration, TransactionType.LONG); + txMetricsCollector.histogram("start.long.latency", (int) timer.elapsedMillis()); + return tx; + } + + private Transaction startTx(long expiration, TransactionType type) { + Transaction tx = null; + long txid; + // guard against changes to the transaction log while processing + this.logReadLock.lock(); + try { + synchronized (this) { + ensureAvailable(); + txid = getNextWritePointer(); + tx = createTransaction(txid, type); + addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type); + } + // appending to WAL out of global lock for concurrent performance + // we should still be able to arrive at the same state even if log entries are out of order + appendToLog(TransactionEdit.createStarted(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type)); + } finally { + this.logReadLock.unlock(); + } + return tx; + } + + private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, + long expiration, TransactionType type) { + inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type)); + advanceWritePointer(writePointer); + } + + private void advanceWritePointer(long writePointer) { + // don't move the write pointer back if we have out of order transaction log entries + if (writePointer > lastWritePointer) { + lastWritePointer = writePointer; + } + } + + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { + txMetricsCollector.rate("canCommit"); + Stopwatch timer = new Stopwatch().start(); + if (inProgress.get(tx.getTransactionId()) == null) { + // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. + if (invalid.contains(tx.getTransactionId())) { + throw new TransactionNotInProgressException( + String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", + tx.getTransactionId())); + } else { + throw new TransactionNotInProgressException( + String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + } + } + + Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size()); + for (byte[] change : changeIds) { + set.add(new ChangeId(change)); + } + + if (hasConflicts(tx, set)) { + return false; + } + // guard against changes to the transaction log while processing + this.logReadLock.lock(); + try { + synchronized (this) { + ensureAvailable(); + addCommittingChangeSet(tx.getTransactionId(), set); + } + appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set)); + } finally { + this.logReadLock.unlock(); + } + txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis()); + return true; + } + + private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) { + committingChangeSets.put(writePointer, changes); + } + + public boolean commit(Transaction tx) throws TransactionNotInProgressException { + txMetricsCollector.rate("commit"); + Stopwatch timer = new Stopwatch().start(); + Set<ChangeId> changeSet = null; + boolean addToCommitted = true; + long commitPointer; + // guard against changes to the transaction log while processing + this.logReadLock.lock(); + try { + synchronized (this) { + ensureAvailable(); + // we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that + // are no longer visible by any in-progress transactions + commitPointer = lastWritePointer + 1; + if (inProgress.get(tx.getTransactionId()) == null) { + // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. + if (invalid.contains(tx.getTransactionId())) { + throw new TransactionNotInProgressException( + String.format("canCommit() is called for transaction %d that is not in progress " + + "(it is known to be invalid)", tx.getTransactionId())); + } else { + throw new TransactionNotInProgressException( + String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); + } + } + + // these should be atomic + // NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot + // be attempted to commit twice + changeSet = committingChangeSets.remove(tx.getTransactionId()); + + if (changeSet != null) { + // double-checking if there are conflicts: someone may have committed since canCommit check + if (hasConflicts(tx, changeSet)) { + return false; + } + } else { + // no changes + addToCommitted = false; + } + doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted); + } + appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted)); + } finally { + this.logReadLock.unlock(); + } + txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis()); + return true; + } + + private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer, + boolean addToCommitted) { + // In case this method is called when loading a previous WAL, we need to remove the tx from these sets + committingChangeSets.remove(transactionId); + if (addToCommitted && !changes.isEmpty()) { + // No need to add empty changes to the committed change sets, they will never trigger any conflict + + // Record the committed change set with the next writePointer as the commit time. + // NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be + // stored under one key + Set<ChangeId> changeIds = committedChangeSets.get(commitPointer); + if (changeIds != null) { + // NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in + // canCommit) use it unguarded + changes.addAll(changeIds); + } + committedChangeSets.put(commitPointer, changes); + } + // remove from in-progress set, so that it does not get excluded in the future + InProgressTx previous = inProgress.remove(transactionId); + if (previous == null) { + // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. + if (invalid.rem(transactionId)) { + invalidArray = invalid.toLongArray(); + LOG.info("Tx invalid list: removed committed tx {}", transactionId); + } + } + // moving read pointer + moveReadPointerIfNeeded(writePointer); + + // All committed change sets that are smaller than the earliest started transaction can be removed. + // here we ignore transactions that have no timeout, they are long-running and don't participate in + // conflict detection. + // TODO: for efficiency, can we do this once per-log in replayLogs instead of once per edit? + committedChangeSets.headMap(TxUtils.getFirstShortInProgress(inProgress)).clear(); + } + + public void abort(Transaction tx) { + // guard against changes to the transaction log while processing + txMetricsCollector.rate("abort"); + Stopwatch timer = new Stopwatch().start(); + this.logReadLock.lock(); + try { + synchronized (this) { + ensureAvailable(); + doAbort(tx.getTransactionId(), tx.getCheckpointWritePointers(), tx.getType()); + } + appendToLog(TransactionEdit.createAborted(tx.getTransactionId(), tx.getType(), tx.getCheckpointWritePointers())); + txMetricsCollector.histogram("abort.latency", (int) timer.elapsedMillis()); + } finally { + this.logReadLock.unlock(); + } + } + + private void doAbort(long writePointer, long[] checkpointWritePointers, TransactionType type) { + committingChangeSets.remove(writePointer); + + if (type == TransactionType.LONG) { + // Long running transactions cannot be aborted as their change sets are not saved, + // and hence the changes cannot be rolled back. Invalidate the long running transaction instead. + doInvalidate(writePointer); + return; + } + + // makes tx visible (assumes that all operations were rolled back) + // remove from in-progress set, so that it does not get excluded in the future + InProgressTx removed = inProgress.remove(writePointer); + if (removed == null) { + // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. + if (invalid.rem(writePointer)) { + // remove any invalidated checkpoint pointers + // this will only be present if the parent write pointer was also invalidated + if (checkpointWritePointers != null) { + for (int i = 0; i < checkpointWritePointers.length; i++) { + invalid.rem(checkpointWritePointers[i]); + } + } + invalidArray = invalid.toLongArray(); + LOG.info("Tx invalid list: removed aborted tx {}", writePointer); + // removed a tx from excludes: must move read pointer + moveReadPointerIfNeeded(writePointer); + } + } else { + // removed a tx from excludes: must move read pointer + moveReadPointerIfNeeded(writePointer); + } + } + + public boolean invalidate(long tx) { + // guard against changes to the transaction log while processing + txMetricsCollector.rate("invalidate"); + Stopwatch timer = new Stopwatch().start(); + this.logReadLock.lock(); + try { + boolean success; + synchronized (this) { + ensureAvailable(); + success = doInvalidate(tx); + } + appendToLog(TransactionEdit.createInvalid(tx)); + txMetricsCollector.histogram("invalidate.latency", (int) timer.elapsedMillis()); + return success; + } finally { + this.logReadLock.unlock(); + } + } + + private boolean doInvalidate(long writePointer) { + Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer); + // remove from in-progress set, so that it does not get excluded in the future + InProgressTx previous = inProgress.remove(writePointer); + // This check is to prevent from invalidating committed transactions + if (previous != null || previousChangeSet != null) { + // add tx to invalids + invalid.add(writePointer); + if (previous == null) { + LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer); + } else { + // invalidate any checkpoint write pointers + LongArrayList childWritePointers = previous.getCheckpointWritePointers(); + if (childWritePointers != null) { + for (int i = 0; i < childWritePointers.size(); i++) { + invalid.add(childWritePointers.get(i)); + } + } + } + LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer); + // todo: find a more efficient way to keep this sorted. Could it just be an array? + Collections.sort(invalid); + invalidArray = invalid.toLongArray(); + if (previous != null && !previous.isLongRunning()) { + // tx was short-running: must move read pointer + moveReadPointerIfNeeded(writePointer); + } + return true; + } + return false; + } + + /** + * Removes the given transaction ids from the invalid list. + * @param invalidTxIds transaction ids + * @return true if invalid list got changed, false otherwise + */ + public boolean truncateInvalidTx(Set<Long> invalidTxIds) { + // guard against changes to the transaction log while processing + txMetricsCollector.rate("truncateInvalidTx"); + Stopwatch timer = new Stopwatch().start(); + this.logReadLock.lock(); + try { + boolean success; + synchronized (this) { + ensureAvailable(); + success = doTruncateInvalidTx(invalidTxIds); + } + appendToLog(TransactionEdit.createTruncateInvalidTx(invalidTxIds)); + txMetricsCollector.histogram("truncateInvalidTx.latency", (int) timer.elapsedMillis()); + return success; + } finally { + this.logReadLock.unlock(); + } + } + + private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) { + LOG.info("Removing tx ids {} from invalid list", invalidTxIds); + boolean success = invalid.removeAll(invalidTxIds); + if (success) { + invalidArray = invalid.toLongArray(); + } + return success; + } + + /** + * Removes all transaction ids started before the given time from invalid list. + * @param time time in milliseconds + * @return true if invalid list got changed, false otherwise + * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time + */ + public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { + // guard against changes to the transaction log while processing + txMetricsCollector.rate("truncateInvalidTxBefore"); + Stopwatch timer = new Stopwatch().start(); + this.logReadLock.lock(); + try { + boolean success; + synchronized (this) { + ensureAvailable(); + success = doTruncateInvalidTxBefore(time); + } + appendToLog(TransactionEdit.createTruncateInvalidTxBefore(time)); + txMetricsCollector.histogram("truncateInvalidTxBefore.latency", (int) timer.elapsedMillis()); + return success; + } finally { + this.logReadLock.unlock(); + } + } + + private boolean doTruncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { + LOG.info("Removing tx ids before {} from invalid list", time); + long truncateWp = time * TxConstants.MAX_TX_PER_MS; + // Check if there any in-progress transactions started earlier than truncate time + if (inProgress.lowerKey(truncateWp) != null) { + throw new InvalidTruncateTimeException("Transactions started earlier than " + time + " are in-progress"); + } + + // Find all invalid transactions earlier than truncateWp + Set<Long> toTruncate = Sets.newHashSet(); + for (long wp : invalid) { + // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp + if (wp >= truncateWp) { + break; + } + toTruncate.add(wp); + } + return doTruncateInvalidTx(toTruncate); + } + + public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException { + txMetricsCollector.rate("checkpoint"); + Stopwatch timer = new Stopwatch().start(); + + Transaction checkpointedTx = null; + long txId = originalTx.getTransactionId(); + long newWritePointer = 0; + // guard against changes to the transaction log while processing + this.logReadLock.lock(); + try { + synchronized (this) { + ensureAvailable(); + // check that the parent tx is in progress + InProgressTx parentTx = inProgress.get(txId); + if (parentTx == null) { + if (invalid.contains(txId)) { + throw new TransactionNotInProgressException( + String.format("Transaction %d is not in progress because it was invalidated", txId)); + } else { + throw new TransactionNotInProgressException( + String.format("Transaction %d is not in progress", txId)); + } + } + newWritePointer = getNextWritePointer(); + doCheckpoint(newWritePointer, txId); + // create a new transaction with the same read snapshot, plus the additional checkpoint write pointer + // the same read snapshot is maintained to + checkpointedTx = new Transaction(originalTx, newWritePointer, + parentTx.getCheckpointWritePointers().toLongArray()); + } + // appending to WAL out of global lock for concurrent performance + // we should still be able to arrive at the same state even if log entries are out of order + appendToLog(TransactionEdit.createCheckpoint(newWritePointer, txId)); + } finally { + this.logReadLock.unlock(); + } + txMetricsCollector.histogram("checkpoint.latency", (int) timer.elapsedMillis()); + + return checkpointedTx; + } + + private void doCheckpoint(long newWritePointer, long parentWritePointer) { + InProgressTx existingTx = inProgress.get(parentWritePointer); + existingTx.addCheckpointWritePointer(newWritePointer); + advanceWritePointer(newWritePointer); + } + + // hack for exposing important metric + public int getExcludedListSize() { + return invalid.size() + inProgress.size(); + } + + /** + * @return the size of invalid list + */ + public int getInvalidSize() { + return this.invalid.size(); + } + + int getCommittedSize() { + return this.committedChangeSets.size(); + } + + private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) { + if (changeIds.isEmpty()) { + return false; + } + + for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) { + // If commit time is greater than tx read-pointer, + // basically not visible but committed means "tx committed after given tx was started" + if (changeSet.getKey() > tx.getTransactionId()) { + if (overlap(changeSet.getValue(), changeIds)) { + return true; + } + } + } + return false; + } + + private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) { + // iterate over the smaller set, and check for every element in the other set + if (a.size() > b.size()) { + for (ChangeId change : b) { + if (a.contains(change)) { + return true; + } + } + } else { + for (ChangeId change : a) { + if (b.contains(change)) { + return true; + } + } + } + return false; + } + + private void moveReadPointerIfNeeded(long committedWritePointer) { + if (committedWritePointer > readPointer) { + readPointer = committedWritePointer; + } + } + + /** + * Creates a new Transaction. This method only get called from start transaction, which is already + * synchronized. + */ + private Transaction createTransaction(long writePointer, TransactionType type) { + // For holding the first in progress short transaction Id (with timeout >= 0). + long firstShortTx = Transaction.NO_TX_IN_PROGRESS; + LongArrayList inProgressIds = new LongArrayList(inProgress.size()); + for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) { + long txId = entry.getKey(); + inProgressIds.add(txId); + // add any checkpointed write pointers to the in-progress list + LongArrayList childIds = entry.getValue().getCheckpointWritePointers(); + if (childIds != null) { + for (int i = 0; i < childIds.size(); i++) { + inProgressIds.add(childIds.get(i)); + } + } + if (firstShortTx == Transaction.NO_TX_IN_PROGRESS && !entry.getValue().isLongRunning()) { + firstShortTx = txId; + } + } + + return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type); + } + + private void appendToLog(TransactionEdit edit) { + try { + Stopwatch timer = new Stopwatch().start(); + currentLog.append(edit); + txMetricsCollector.rate("wal.append.count"); + txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis()); + } catch (IOException ioe) { + abortService("Error appending to transaction log", ioe); + } + } + + private void appendToLog(List<TransactionEdit> edits) { + try { + Stopwatch timer = new Stopwatch().start(); + currentLog.append(edits); + txMetricsCollector.rate("wal.append.count", edits.size()); + txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis()); + } catch (IOException ioe) { + abortService("Error appending to transaction log", ioe); + } + } + + /** + * Called from the tx service every 10 seconds. + * This hack is needed because current metrics system is not flexible when it comes to adding new metrics. + */ + public void logStatistics() { + LOG.info("Transaction Statistics: write pointer = " + lastWritePointer + + ", invalid = " + invalid.size() + + ", in progress = " + inProgress.size() + + ", committing = " + committingChangeSets.size() + + ", committed = " + committedChangeSets.size()); + } + + private abstract static class DaemonThreadExecutor extends Thread { + private AtomicBoolean stopped = new AtomicBoolean(false); + + public DaemonThreadExecutor(String name) { + super(name); + setDaemon(true); + } + + public void run() { + try { + while (!isInterrupted() && !stopped.get()) { + doRun(); + synchronized (stopped) { + stopped.wait(getSleepMillis()); + } + } + } catch (InterruptedException ie) { + LOG.info("Interrupted thread " + getName()); + } + // perform any final cleanup + onShutdown(); + LOG.info("Exiting thread " + getName()); + } + + public abstract void doRun(); + + protected abstract long getSleepMillis(); + + protected void onShutdown() { + } + + public void shutdown() { + if (stopped.compareAndSet(false, true)) { + synchronized (stopped) { + stopped.notifyAll(); + } + } + } + } + + /** + * Represents some of the info on in-progress tx + */ + public static final class InProgressTx { + /** the oldest in progress tx at the time of this tx start */ + private final long visibilityUpperBound; + private final long expiration; + private final TransactionType type; + private LongArrayList checkpointWritePointers = new LongArrayList(); + + public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type) { + this(visibilityUpperBound, expiration, type, new LongArrayList()); + } + + public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type, + LongArrayList checkpointWritePointers) { + this.visibilityUpperBound = visibilityUpperBound; + this.expiration = expiration; + this.type = type; + this.checkpointWritePointers = checkpointWritePointers; + } + + // For backwards compatibility when long running txns were represented with -1 expiration + @Deprecated + public InProgressTx(long visibilityUpperBound, long expiration) { + this(visibilityUpperBound, expiration, null); + } + + public long getVisibilityUpperBound() { + return visibilityUpperBound; + } + + public long getExpiration() { + return expiration; + } + + @Nullable + public TransactionType getType() { + return type; + } + + public boolean isLongRunning() { + if (type == null) { + // for backwards compatibility when long running txns were represented with -1 expiration + return expiration == -1; + } + return type == TransactionType.LONG; + } + + public void addCheckpointWritePointer(long checkpointWritePointer) { + checkpointWritePointers.add(checkpointWritePointer); + } + + public LongArrayList getCheckpointWritePointers() { + return checkpointWritePointers; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof InProgressTx)) { + return false; + } + + if (this == o) { + return true; + } + + InProgressTx other = (InProgressTx) o; + return Objects.equal(visibilityUpperBound, other.getVisibilityUpperBound()) && + Objects.equal(expiration, other.getExpiration()) && + Objects.equal(type, other.type) && + Objects.equal(checkpointWritePointers, other.checkpointWritePointers); + } + + @Override + public int hashCode() { + return Objects.hashCode(visibilityUpperBound, expiration, type, checkpointWritePointers); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("visibilityUpperBound", visibilityUpperBound) + .add("expiration", expiration) + .add("type", type) + .add("checkpointWritePointers", checkpointWritePointers) + .toString(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java b/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java new file mode 100644 index 0000000..0a9dd1d --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionServiceMain.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.tephra.util.ConfigurationFactory; +import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + +/** + * Driver class to start and stop tx in distributed mode. + */ +public class TransactionServiceMain { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionServiceMain.class); + + private Configuration conf; + private TransactionService txService; + + public static void main(String args[]) throws Exception { + TransactionServiceMain instance = new TransactionServiceMain(); + instance.doMain(args); + } + + public TransactionServiceMain() { + this(null); + } + + public TransactionServiceMain(Configuration conf) { + this.conf = conf; + } + + /** + * The main method. It simply call methods in the same sequence + * as if the program is started by jsvc. + */ + public void doMain(final String[] args) throws Exception { + final CountDownLatch shutdownLatch = new CountDownLatch(1); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + try { + TransactionServiceMain.this.stop(); + } finally { + try { + TransactionServiceMain.this.destroy(); + } finally { + shutdownLatch.countDown(); + } + } + } catch (Throwable t) { + LOG.error("Exception when shutting down: " + t.getMessage(), t); + } + } + }); + init(args); + start(); + + shutdownLatch.await(); + } + + /** + * Invoked by jsvc to initialize the program. + */ + public void init(String[] args) { + if (conf == null) { + conf = new ConfigurationFactory().get(); + } + } + + /** + * Invoked by jsvc to start the program. + */ + public void start() throws Exception { + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + + ZKClientService zkClientService = injector.getInstance(ZKClientService.class); + zkClientService.startAndWait(); + + // start a tx server + txService = injector.getInstance(TransactionService.class); + try { + LOG.info("Starting {}", getClass().getSimpleName()); + txService.startAndWait(); + } catch (Exception e) { + System.err.println("Failed to start service: " + e.getMessage()); + } + } + + /** + * Invoked by jsvc to stop the program. + */ + public void stop() { + LOG.info("Stopping {}", getClass().getSimpleName()); + if (txService == null) { + return; + } + try { + if (txService.isRunning()) { + txService.stopAndWait(); + } + } catch (Throwable e) { + LOG.error("Failed to shutdown transaction service.", e); + // because shutdown hooks execute concurrently, the logger may be closed already: thus also print it. + System.err.println("Failed to shutdown transaction service: " + e.getMessage()); + e.printStackTrace(System.err); + } + } + + /** + * Invoked by jsvc for resource cleanup. + */ + public void destroy() { } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java new file mode 100644 index 0000000..9009f84 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import java.io.InputStream; +import java.util.Collection; +import java.util.Set; + +/** + * Client talking to transaction system. + * See also {@link TransactionAware}. + * todo: explain Omid. + */ +public interface TransactionSystemClient { + /** + * Starts new short transaction. + * @return instance of {@link Transaction} + */ + // TODO: "short" and "long" are very misleading names. Use transaction attributes like "detect conflicts or not", etc. + Transaction startShort(); + + /** + * Starts new short transaction. + * @param timeout the timeout for the transaction + * @return instance of {@link Transaction} + */ + Transaction startShort(int timeout); + + /** + * Starts new long transaction. + * @return instance of {@link Transaction} + */ + Transaction startLong(); + + // this pre-commit detects conflicts with other transactions committed so far + // NOTE: the changes set should not change after this operation, this may help us do some extra optimizations + // NOTE: there should be time constraint on how long does it take to commit changes by the client after this operation + // is submitted so that we can cleanup related resources + // NOTE: as of now you can call this method multiple times, each time the changeSet of tx will be updated. Not sure + // if we can call it a feature or a side-affect of implementation. It makes more sense to append changeset, but + // before we really need it we don't do it because it will slow down tx manager throughput. + + /** + * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and + * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call + * and {@link #commit(Transaction)} which may check conflicts again to avoid races. + * <p/> + * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts + * before we persist changes to avoid rollback in case of conflicts as much as possible. + * NOTE: in some situations we may want to skip this step to save on RPC with a risk of many rollback ops. So by + * default we take safe path. + * + * @param tx transaction to verify + * @param changeIds ids of changes made by transaction + * @return true if transaction can be committed otherwise false + */ + boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException; + + /** + * Makes transaction visible. It will again check conflicts of changes submitted previously with + * {@link #canCommit(Transaction, java.util.Collection)} + * @param tx transaction to make visible. + * @return true if transaction can be committed otherwise false + */ + boolean commit(Transaction tx) throws TransactionNotInProgressException; + + /** + * Makes transaction visible. You should call it only when all changes of this tx are undone. + * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out. + * @param tx transaction to make visible. + */ + void abort(Transaction tx); + + /** + * Makes transaction invalid. You should call it if not all changes of this tx could be undone. + * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out. + * @param tx transaction id to invalidate. + * @return true if transaction has been successfully invalidated + */ + boolean invalidate(long tx); + + /** + * Performs a checkpoint operation on the current transaction, returning a new Transaction instance with the + * updated state. A checkpoint operation assigns a new write pointer for the current transaction. + * @param tx the current transaction to checkpoint + * @return an updated transaction instance with the new write pointer + */ + Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException; + + /** + * Retrieves the state of the transaction manager and send it as a stream. The snapshot will not be persisted. + * @return an input stream containing an encoded snapshot of the transaction manager + */ + InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException; + + /** + * Return the status of the transaction Manager + * @return a String which denotes the status of txManager + */ + String status(); + + /** + * Resets the state of the transaction manager. + */ + void resetState(); + + /** + * Removes the given transaction ids from the invalid list. + * @param invalidTxIds transaction ids + * @return true if invalid list got changed, false otherwise + */ + boolean truncateInvalidTx(Set<Long> invalidTxIds); + + /** + * Removes all transaction ids started before the given time from invalid list. + * @param time time in milliseconds + * @return true if invalid list got changed, false otherwise + * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time + */ + boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException; + + /** + * @return the size of invalid list + */ + int getInvalidSize(); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java new file mode 100644 index 0000000..7f7fc8c --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecV2; +import org.apache.tephra.snapshot.SnapshotCodecV3; +import org.apache.tephra.snapshot.SnapshotCodecV4; + +import java.util.concurrent.TimeUnit; + +/** + * Transaction system constants + */ +public class TxConstants { + /** + * Defines what level of conflict detection should be used for transactions. {@code ROW} means that only the + * table name and row key for each change will be used to determine if the transaction change sets conflict. + * {@code COLUMN} means that the table name, row key, column family, and column qualifier will all be used to + * identify write conflicts. {@code NONE} means that no conflict detection will be performed, but transaction + * clients will still track the current transaction's change set to rollback any persisted changes in the event of + * a failure. This should only be used where writes to the same coordinate should never conflict, such as + * append-only data. The default value used by {@code TransactionAwareHTable} implementations is {@code COLUMN}. + * + * <p> + * <strong>Note: for a given table, all clients must use the same conflict detection setting!</strong> + * Otherwise conflicts will not be flagged correctly. + * </p> + */ + public enum ConflictDetection { + ROW, + COLUMN, + NONE + } + + /** + * Property set for {@code org.apache.hadoop.hbase.HColumnDescriptor} to configure time-to-live on data within + * the column family. The value given is in milliseconds. Once a cell's data has surpassed the given value in age, + * the cell's data will no longer be visible and may be garbage collected. + */ + public static final String PROPERTY_TTL = "dataset.table.ttl"; + + /** + * This is how many tx we allow per millisecond, if you care about the system for 100 years: + * Long.MAX_VALUE / (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365 * 100)) = + * (as of Feb 20, 2014) 2,028,653. It is safe and convenient to use 1,000,000 as multiplier: + * <ul> + * <li> + * we hardly can do more than 1 billion txs per second + * </li> + * <li> + * long value will not overflow for 200 years + * </li> + * <li> + * makes reading & debugging easier if multiplier is 10^n + * </li> + * </ul> + */ + public static final long MAX_TX_PER_MS = 1000000; + + /** + * Since HBase {@code Delete} operations cannot be undone at the same timestamp, "deleted" data is instead + * overwritten with an empty {@code byte[]} to flag it as removed. Cells with empty values will be filtered out + * of the results for read operations. If cells with empty values should be included in results (meaning data + * cannot be transactionally deleted), then set this configuration property to true. + */ + public static final String ALLOW_EMPTY_VALUES_KEY = "data.tx.allow.empty.values"; + public static final boolean ALLOW_EMPTY_VALUES_DEFAULT = false; + + /** + * Key used to set the serialized transaction as an attribute on Get and Scan operations. + */ + public static final String TX_OPERATION_ATTRIBUTE_KEY = "cask.tx"; + /** + * Key used to flag a delete operation as part of a transaction rollback. This is used so that the + * {@code TransactionProcessor} coprocessor loaded on a table can differentiate between deletes issued + * as part of a normal client operation versus those performed when rolling back a transaction. + */ + public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "cask.tx.rollback"; + + /** + * Column qualifier used for a special delete marker tombstone, which identifies an entire column family as deleted. + */ + public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0]; + + // Constants for monitoring status + public static final String STATUS_OK = "OK"; + public static final String STATUS_NOTOK = "NOTOK"; + + /** + * Indicates whether data written before Tephra was enabled on a table should be readable. Reading non-transactional + * data can lead to slight performance penalty. Hence it is disabled by default. + * @see <a href="https://issues.cask.co/browse/TEPHRA-89">TEPHRA-89</a> + */ + public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; + public static final boolean DEFAULT_READ_NON_TX_DATA = false; + + /** + * TransactionManager configuration. + */ + public static final class Manager { + // TransactionManager configuration + public static final String CFG_DO_PERSIST = "tx.persist"; + /** Directory in HDFS used for transaction snapshot and log storage. */ + public static final String CFG_TX_SNAPSHOT_DIR = "data.tx.snapshot.dir"; + /** Directory on the local filesystem used for transaction snapshot and log storage. */ + public static final String CFG_TX_SNAPSHOT_LOCAL_DIR = "data.tx.snapshot.local.dir"; + /** How often to clean up timed out transactions, in seconds, or 0 for no cleanup. */ + public static final String CFG_TX_CLEANUP_INTERVAL = "data.tx.cleanup.interval"; + /** The user id to access HDFS if not running in secure HDFS. */ + public static final String CFG_TX_HDFS_USER = "data.tx.hdfs.user"; + /** Default value for how often to check in-progress transactions for expiration, in seconds. */ + public static final int DEFAULT_TX_CLEANUP_INTERVAL = 10; + /** + * The timeout for a transaction, in seconds. If the transaction is not finished in that time, + * it is marked invalid. + */ + public static final String CFG_TX_TIMEOUT = "data.tx.timeout"; + /** Default value for transaction timeout, in seconds. */ + public static final int DEFAULT_TX_TIMEOUT = 30; + /** + * The timeout for a long running transaction, in seconds. If the transaction is not finished in that time, + * it is marked invalid. + */ + public static final String CFG_TX_LONG_TIMEOUT = "data.tx.long.timeout"; + /** Default value for long running transaction timeout, in seconds. */ + public static final int DEFAULT_TX_LONG_TIMEOUT = (int) TimeUnit.DAYS.toSeconds(1); + /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots. */ + public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval"; + /** Default value for frequency of periodic snapshots of transaction state. */ + public static final long DEFAULT_TX_SNAPSHOT_INTERVAL = 300; + /** Number of most recent transaction snapshots to retain. */ + public static final String CFG_TX_SNAPSHOT_RETAIN = "data.tx.snapshot.retain"; + /** Default value for number of most recent snapshots to retain. */ + public static final int DEFAULT_TX_SNAPSHOT_RETAIN = 10; + } + + /** + * TransactionService configuration. + */ + public static final class Service { + + /** for the zookeeper quorum string for leader election for tx server. */ + public static final String CFG_DATA_TX_ZOOKEEPER_QUORUM + = "data.tx.zookeeper.quorum"; + + /** for the name used to announce service availability to discovery service */ + public static final String CFG_DATA_TX_DISCOVERY_SERVICE_NAME + = "data.tx.discovery.service.name"; + + /** for the port of the tx server. */ + public static final String CFG_DATA_TX_BIND_PORT + = "data.tx.bind.port"; + + /** for the address (hostname) of the tx server. */ + public static final String CFG_DATA_TX_BIND_ADDRESS + = "data.tx.bind.address"; + + /** the number of IO threads in the tx service. */ + public static final String CFG_DATA_TX_SERVER_IO_THREADS + = "data.tx.server.io.threads"; + + /** the number of handler threads in the tx service. */ + public static final String CFG_DATA_TX_SERVER_THREADS + = "data.tx.server.threads"; + + public static final String CFG_DATA_TX_THRIFT_MAX_READ_BUFFER + = "data.tx.thrift.max.read.buffer"; + + public static final String DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME + = "transaction"; + + /** default tx service port. */ + public static final int DEFAULT_DATA_TX_BIND_PORT + = 15165; + + /** default tx service address. */ + public static final String DEFAULT_DATA_TX_BIND_ADDRESS + = "0.0.0.0"; + + /** default number of handler IO threads in tx service. */ + public static final int DEFAULT_DATA_TX_SERVER_IO_THREADS + = 2; + + /** default number of handler threads in tx service. */ + public static final int DEFAULT_DATA_TX_SERVER_THREADS + = 20; + + /** default thrift max read buffer size */ + public static final int DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER + = 16 * 1024 * 1024; + + // Configuration key names and defaults used by tx client. + + /** to specify the tx client socket timeout in ms. */ + public static final String CFG_DATA_TX_CLIENT_TIMEOUT + = "data.tx.client.timeout"; + + /** to specify the tx client provider strategy. */ + public static final String CFG_DATA_TX_CLIENT_PROVIDER + = "data.tx.client.provider"; + + /** to specify the number of threads for client provider "pool". */ + public static final String CFG_DATA_TX_CLIENT_COUNT + = "data.tx.client.count"; + + /** timeout (in milliseconds) for obtaining client from client provider "pool". */ + public static final String CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS + = "data.tx.client.obtain.timeout"; + + /** to specify the retry strategy for a failed thrift call. */ + public static final String CFG_DATA_TX_CLIENT_RETRY_STRATEGY + = "data.tx.client.retry.strategy"; + + /** to specify the number of times to retry a failed thrift call. */ + public static final String CFG_DATA_TX_CLIENT_ATTEMPTS + = "data.tx.client.retry.attempts"; + + /** to specify the initial sleep time for retry strategy backoff. */ + public static final String CFG_DATA_TX_CLIENT_BACKOFF_INITIAL + = "data.tx.client.retry.backoff.initial"; + + /** to specify the backoff factor for retry strategy backoff. */ + public static final String CFG_DATA_TX_CLIENT_BACKOFF_FACTOR + = "data.tx.client.retry.backoff.factor"; + + /** to specify the sleep time limit for retry strategy backoff. */ + public static final String CFG_DATA_TX_CLIENT_BACKOFF_LIMIT + = "data.tx.client.retry.backoff.limit"; + + /** the default tx client socket timeout in milli seconds. */ + public static final int DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS + = 30 * 1000; + + /** default number of tx clients for client provider "pool". */ + public static final int DEFAULT_DATA_TX_CLIENT_COUNT + = 50; + + /** default timeout (in milliseconds) for obtaining client from client provider "pool". */ + public static final long DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS + = TimeUnit.SECONDS.toMillis(3); + + /** default tx client provider strategy. */ + public static final String DEFAULT_DATA_TX_CLIENT_PROVIDER + = "pool"; + + /** retry strategy for thrift clients, e.g. backoff, or n-times. */ + public static final String DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY + = "backoff"; + + /** default number of attempts for strategy n-times. */ + public static final int DEFAULT_DATA_TX_CLIENT_ATTEMPTS + = 2; + + /** default initial sleep is 100ms. */ + public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL + = 100; + + /** default backoff factor is 4. */ + public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR + = 4; + + /** default sleep limit is 30 sec. */ + public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT + = 30 * 1000; + } + + /** + * Configuration properties for metrics reporting + */ + public static final class Metrics { + /** + * Frequency at which metrics should be reported, in seconds. + */ + public static final String REPORT_PERIOD_KEY = "data.tx.metrics.period"; + /** + * Default report period for metrics, in seconds. + */ + public static final int REPORT_PERIOD_DEFAULT = 60; + } + + /** + * Configuration properties used by HBase + */ + public static final class HBase { + public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; + public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000; + } + + /** + * Configuration for the TransactionDataJanitor coprocessor. + */ + public static final class DataJanitor { + /** + * Whether or not the TransactionDataJanitor coprocessor should be enabled on tables. + * Disable for testing. + */ + public static final String CFG_TX_JANITOR_ENABLE = "data.tx.janitor.enable"; + public static final boolean DEFAULT_TX_JANITOR_ENABLE = true; + } + + /** + * Configuration for the transaction snapshot persistence. + */ + public static final class Persist { + /** + * The class names of all known transaction snapshot codecs. + */ + public static final String CFG_TX_SNAPHOT_CODEC_CLASSES = "data.tx.snapshot.codecs"; + public static final Class[] DEFAULT_TX_SNAPHOT_CODEC_CLASSES = + { DefaultSnapshotCodec.class, SnapshotCodecV2.class, SnapshotCodecV3.class, SnapshotCodecV4.class }; + } + + /** + * Configuration for transaction log edit entries + */ + public static final class TransactionLog { + /** + * Key used to denote the number of entries appended. + */ + public static final String NUM_ENTRIES_APPENDED = "count"; + public static final String VERSION_KEY = "version"; + public static final byte CURRENT_VERSION = 2; + } + +}
