http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java deleted file mode 100644 index 3f51956..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java +++ /dev/null @@ -1,1398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.metrics.DefaultMetricsCollector; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.persist.NoOpTransactionStateStorage; -import co.cask.tephra.persist.TransactionEdit; -import co.cask.tephra.persist.TransactionLog; -import co.cask.tephra.persist.TransactionLogReader; -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import co.cask.tephra.util.TxUtils; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AbstractService; -import com.google.inject.Inject; -import it.unimi.dsi.fastutil.longs.LongArrayList; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * This is the central place to manage all active transactions in the system. - * - * A transaction consists of - * <ul> - * <li>A write pointer: This is the version used for all writes of that transaction.</li> - * <li>A read pointer: All reads under the transaction use this as an upper bound for the version.</li> - * <li>A set of excluded versions: These are the write versions of other transactions that must be excluded from - * reads, because those transactions are still in progress, or they failed but couldn't be properly rolled back.</li> - * </ul> - * To use the transaction system, a client must follow this sequence of steps: - * <ol> - * <li>Request a new transaction.</li> - * <li>Use the transaction to read and write datasets. Datasets are encouraged to cache the writes of the - * transaction in memory, to reduce the cost of rollback in case the transaction fails. </li> - * <li>Check whether the transaction has conflicts. For this, the set of change keys are submitted via canCommit(), - * and the transaction manager verifies that none of these keys are in conflict with other transactions that - * committed since the start of this transaction.</li> - * <li>If the transaction has conflicts: - * <ol> - * <li>Roll back the changes in every dataset that was changed. This can happen in-memory if the - * changes were cached.</li> - * <li>Abort the transaction to remove it from the active transactions.</li> - * </ol> - * <li>If the transaction has no conflicts:</li> - * <ol> - * <li>Persist all datasets changes to storage.</li> - * <li>Commit the transaction. This will repeat the conflict detection, because more overlapping transactions - * may have committed since the first conflict check.</li> - * <li>If the transaction has conflicts:</li> - * <ol> - * <li>Roll back the changes in every dataset that was changed. This is more expensive because - * changes must be undone in persistent storage.</li> - * <li>Abort the transaction to remove it from the active transactions.</li> - * </ol> - * </ol> - * </ol> - * Transactions may be short or long-running. A short transaction is started with a timeout, and if it is not - * committed before that timeout, it is invalidated and excluded from future reads. A long-running transaction has - * no timeout and will remain active until it is committed or aborted. Long transactions are typically used in - * map/reduce jobs and can produce enormous amounts of changes. Therefore, long transactions do not participate in - * conflict detection (they would almost always have conflicts). We also assume that the changes of long transactions - * are not tracked, and therefore cannot be rolled back. Hence, when a long transaction is aborted, it remains in the - * list of excluded transactions to make its writes invisible. - */ -public class TransactionManager extends AbstractService { - // todo: optimize heavily - - private static final Logger LOG = LoggerFactory.getLogger(TransactionManager.class); - - // poll every 1 second to check whether a snapshot is needed - private static final long SNAPSHOT_POLL_INTERVAL = 1000L; - - //poll every 10 second to emit metrics - private static final long METRICS_POLL_INTERVAL = 10000L; - - private static final long[] NO_INVALID_TX = { }; - - // Transactions that are in progress, with their info. - private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long, InProgressTx>(); - - // the list of transactions that are invalid (not properly committed/aborted, or timed out) - // TODO: explain usage of two arrays - private final LongArrayList invalid = new LongArrayList(); - private long[] invalidArray = NO_INVALID_TX; - - // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil) - // todo: should this be consolidated with inProgress? - // commit time next writePointer -> changes made by this tx - private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = - new ConcurrentSkipListMap<Long, Set<ChangeId>>(); - // not committed yet - private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap(); - - private long readPointer; - private long lastWritePointer; - private MetricsCollector txMetricsCollector; - - private final TransactionStateStorage persistor; - - private final int cleanupInterval; - private final int defaultTimeout; - private final int defaultLongTimeout; - private DaemonThreadExecutor cleanupThread = null; - - private volatile TransactionLog currentLog; - - // timestamp of the last completed snapshot - private long lastSnapshotTime; - // frequency in millis to perform snapshots - private final long snapshotFrequencyInSeconds; - // number of most recent snapshots to retain - private final int snapshotRetainCount; - private DaemonThreadExecutor snapshotThread; - private DaemonThreadExecutor metricsThread; - - // lock guarding change of the current transaction log - private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock(); - private final Lock logReadLock = logLock.readLock(); - private final Lock logWriteLock = logLock.writeLock(); - - // fudge factor (in milliseconds) used when interpreting transactions as LONG based on expiration - // TODO: REMOVE WITH txnBackwardsCompatCheck() - private final long longTimeoutTolerance; - - public TransactionManager(Configuration config) { - this(config, new NoOpTransactionStateStorage(new SnapshotCodecProvider(config)), new DefaultMetricsCollector()); - } - - @Inject - public TransactionManager(Configuration conf, @Nonnull TransactionStateStorage persistor, - MetricsCollector txMetricsCollector) { - this.persistor = persistor; - cleanupInterval = conf.getInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, - TxConstants.Manager.DEFAULT_TX_CLEANUP_INTERVAL); - defaultTimeout = conf.getInt(TxConstants.Manager.CFG_TX_TIMEOUT, - TxConstants.Manager.DEFAULT_TX_TIMEOUT); - defaultLongTimeout = conf.getInt(TxConstants.Manager.CFG_TX_LONG_TIMEOUT, - TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT); - snapshotFrequencyInSeconds = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, - TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL); - // must always keep at least 1 snapshot - snapshotRetainCount = Math.max(conf.getInt(TxConstants.Manager.CFG_TX_SNAPSHOT_RETAIN, - TxConstants.Manager.DEFAULT_TX_SNAPSHOT_RETAIN), 1); - - // intentionally not using a constant, as this config should not be exposed - // TODO: REMOVE WITH txnBackwardsCompatCheck() - longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000); - - // - this.txMetricsCollector = txMetricsCollector; - this.txMetricsCollector.configure(conf); - clear(); - } - - private void clear() { - invalid.clear(); - invalidArray = NO_INVALID_TX; - inProgress.clear(); - committedChangeSets.clear(); - committingChangeSets.clear(); - lastWritePointer = 0; - readPointer = 0; - lastSnapshotTime = 0; - } - - private boolean isStopping() { - return State.STOPPING.equals(state()); - } - - @Override - public synchronized void doStart() { - LOG.info("Starting transaction manager."); - txMetricsCollector.start(); - // start up the persistor - persistor.startAndWait(); - try { - persistor.setupStorage(); - } catch (IOException e) { - Throwables.propagate(e); - } - // establish defaults in case there is no persistence - clear(); - // attempt to recover state from last run - recoverState(); - // start the periodic cleanup thread - startCleanupThread(); - startSnapshotThread(); - startMetricsThread(); - // initialize the WAL if we did not force a snapshot in recoverState() - initLog(); - // initialize next write pointer if needed - if (lastWritePointer == 0) { - lastWritePointer = getNextWritePointer(); - readPointer = lastWritePointer; - } - - notifyStarted(); - } - - private void initLog() { - if (currentLog == null) { - try { - currentLog = persistor.createLog(System.currentTimeMillis()); - } catch (IOException ioe) { - throw Throwables.propagate(ioe); - } - } - } - - private void startCleanupThread() { - if (cleanupInterval <= 0 || defaultTimeout <= 0) { - return; - } - LOG.info("Starting periodic timed-out transaction cleanup every " + cleanupInterval + - " seconds with default timeout of " + defaultTimeout + " seconds."); - this.cleanupThread = new DaemonThreadExecutor("tx-clean-timeout") { - @Override - public void doRun() { - cleanupTimedOutTransactions(); - } - - @Override - public long getSleepMillis() { - return cleanupInterval * 1000; - } - }; - cleanupThread.start(); - } - - private void startSnapshotThread() { - if (snapshotFrequencyInSeconds > 0) { - LOG.info("Starting periodic snapshot thread, frequency = " + snapshotFrequencyInSeconds + - " seconds, location = " + persistor.getLocation()); - this.snapshotThread = new DaemonThreadExecutor("tx-snapshot") { - @Override - public void doRun() { - long currentTime = System.currentTimeMillis(); - if (lastSnapshotTime < (currentTime - snapshotFrequencyInSeconds * 1000)) { - try { - doSnapshot(false); - } catch (IOException ioe) { - LOG.error("Periodic snapshot failed!", ioe); - } - } - } - - @Override - protected void onShutdown() { - // perform a final snapshot - try { - LOG.info("Writing final snapshot prior to shutdown"); - doSnapshot(true); - } catch (IOException ioe) { - LOG.error("Failed performing final snapshot on shutdown", ioe); - } - } - - @Override - public long getSleepMillis() { - return SNAPSHOT_POLL_INTERVAL; - } - }; - snapshotThread.start(); - } - } - - // Emits Transaction Data structures size as metrics - private void startMetricsThread() { - LOG.info("Starting periodic Metrics Emitter thread, frequency = " + METRICS_POLL_INTERVAL); - this.metricsThread = new DaemonThreadExecutor("tx-metrics") { - @Override - public void doRun() { - txMetricsCollector.gauge("committing.size", committingChangeSets.size()); - txMetricsCollector.gauge("committed.size", committedChangeSets.size()); - txMetricsCollector.gauge("inprogress.size", inProgress.size()); - txMetricsCollector.gauge("invalid.size", invalidArray.length); - } - - @Override - protected void onShutdown() { - // perform a final metrics emit - txMetricsCollector.gauge("committing.size", committingChangeSets.size()); - txMetricsCollector.gauge("committed.size", committedChangeSets.size()); - txMetricsCollector.gauge("inprogress.size", inProgress.size()); - txMetricsCollector.gauge("invalid.size", invalidArray.length); - } - - @Override - public long getSleepMillis() { - return METRICS_POLL_INTERVAL; - } - }; - metricsThread.start(); - } - - private void cleanupTimedOutTransactions() { - List<TransactionEdit> invalidEdits = null; - this.logReadLock.lock(); - try { - synchronized (this) { - if (!isRunning()) { - return; - } - - long currentTime = System.currentTimeMillis(); - List<Long> timedOut = Lists.newArrayList(); - for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) { - long expiration = tx.getValue().getExpiration(); - if (expiration >= 0L && currentTime > expiration) { - // timed out, remember tx id (can't remove while iterating over entries) - timedOut.add(tx.getKey()); - LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey()); - } else if (expiration < 0) { - LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " + - "migrated correctly, this transaction will be expired immediately", - tx.getKey(), expiration); - timedOut.add(tx.getKey()); - } - } - if (!timedOut.isEmpty()) { - invalidEdits = Lists.newArrayListWithCapacity(timedOut.size()); - invalid.addAll(timedOut); - for (long tx : timedOut) { - committingChangeSets.remove(tx); - inProgress.remove(tx); - invalidEdits.add(TransactionEdit.createInvalid(tx)); - } - - // todo: find a more efficient way to keep this sorted. Could it just be an array? - Collections.sort(invalid); - invalidArray = invalid.toLongArray(); - LOG.info("Invalidated {} transactions due to timeout.", timedOut.size()); - } - } - if (invalidEdits != null) { - appendToLog(invalidEdits); - } - } finally { - this.logReadLock.unlock(); - } - } - - public synchronized TransactionSnapshot getSnapshot() throws IOException { - TransactionSnapshot snapshot = null; - if (!isRunning() && !isStopping()) { - return null; - } - - long now = System.currentTimeMillis(); - // avoid duplicate snapshots at same timestamp - if (now == lastSnapshotTime || (currentLog != null && now == currentLog.getTimestamp())) { - try { - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException ie) { } - } - // copy in memory state - snapshot = getCurrentState(); - - LOG.debug("Starting snapshot of transaction state with timestamp {}", snapshot.getTimestamp()); - LOG.debug("Returning snapshot of state: " + snapshot); - return snapshot; - } - - /** - * Take a snapshot of the transaction state and serialize it into the given output stream. - * @return whether a snapshot was taken. - */ - public boolean takeSnapshot(OutputStream out) throws IOException { - TransactionSnapshot snapshot = getSnapshot(); - if (snapshot != null) { - persistor.writeSnapshot(out, snapshot); - return true; - } else { - return false; - } - } - - private void doSnapshot(boolean closing) throws IOException { - long snapshotTime = 0L; - TransactionSnapshot snapshot = null; - TransactionLog oldLog = null; - try { - this.logWriteLock.lock(); - try { - synchronized (this) { - snapshot = getSnapshot(); - if (snapshot == null && !closing) { - return; - } - if (snapshot != null) { - snapshotTime = snapshot.getTimestamp(); - } - - // roll WAL - oldLog = currentLog; - if (!closing) { - currentLog = persistor.createLog(snapshot.getTimestamp()); - } - } - // there may not be an existing log on startup - if (oldLog != null) { - oldLog.close(); - } - } finally { - this.logWriteLock.unlock(); - } - - // save snapshot - if (snapshot != null) { - persistor.writeSnapshot(snapshot); - lastSnapshotTime = snapshotTime; - - // clean any obsoleted snapshots and WALs - long oldestRetainedTimestamp = persistor.deleteOldSnapshots(snapshotRetainCount); - persistor.deleteLogsOlderThan(oldestRetainedTimestamp); - } - } catch (IOException ioe) { - abortService("Snapshot (timestamp " + snapshotTime + ") failed due to: " + ioe.getMessage(), ioe); - } - } - - public synchronized TransactionSnapshot getCurrentState() { - return TransactionSnapshot.copyFrom(System.currentTimeMillis(), readPointer, lastWritePointer, - invalid, inProgress, committingChangeSets, committedChangeSets); - } - - public synchronized void recoverState() { - try { - TransactionSnapshot lastSnapshot = persistor.getLatestSnapshot(); - // if we failed before a snapshot could complete, we might not have one to restore - if (lastSnapshot != null) { - restoreSnapshot(lastSnapshot); - } - // replay any WALs since the last snapshot - Collection<TransactionLog> logs = persistor.getLogsSince(lastSnapshotTime); - if (logs != null) { - replayLogs(logs); - } - } catch (IOException e) { - LOG.error("Unable to read back transaction state:", e); - throw Throwables.propagate(e); - } - } - - /** - * Restore the initial in-memory transaction state from a snapshot. - */ - private void restoreSnapshot(TransactionSnapshot snapshot) { - LOG.info("Restoring transaction state from snapshot at " + snapshot.getTimestamp()); - Preconditions.checkState(lastSnapshotTime == 0, "lastSnapshotTime has been set!"); - Preconditions.checkState(readPointer == 0, "readPointer has been set!"); - Preconditions.checkState(lastWritePointer == 0, "lastWritePointer has been set!"); - Preconditions.checkState(invalid.isEmpty(), "invalid list should be empty!"); - Preconditions.checkState(inProgress.isEmpty(), "inProgress map should be empty!"); - Preconditions.checkState(committingChangeSets.isEmpty(), "committingChangeSets should be empty!"); - Preconditions.checkState(committedChangeSets.isEmpty(), "committedChangeSets should be empty!"); - LOG.info("Restoring snapshot of state: " + snapshot); - - lastSnapshotTime = snapshot.getTimestamp(); - readPointer = snapshot.getReadPointer(); - lastWritePointer = snapshot.getWritePointer(); - invalid.addAll(snapshot.getInvalid()); - inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress())); - committingChangeSets.putAll(snapshot.getCommittingChangeSets()); - committedChangeSets.putAll(snapshot.getCommittedChangeSets()); - } - - /** - * Check if in-progress transactions need to be migrated to have expiration time and type, if so do the migration. - * This is required for backwards compatibility, when long running transactions were represented - * with expiration time -1. This can be removed when we stop supporting SnapshotCodec version 1. - */ - public static Map<Long, InProgressTx> txnBackwardsCompatCheck(int defaultLongTimeout, long longTimeoutTolerance, - Map<Long, InProgressTx> inProgress) { - for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) { - long writePointer = entry.getKey(); - long expiration = entry.getValue().getExpiration(); - // LONG transactions will either have a negative expiration or expiration set to the long timeout - // use a fudge factor on the expiration check, since expiraton is set based on system time, not the write pointer - if (entry.getValue().getType() == null && - (expiration < 0 || - (getTxExpirationFromWritePointer(writePointer, defaultLongTimeout) - expiration - < longTimeoutTolerance))) { - // handle null expiration - long newExpiration = getTxExpirationFromWritePointer(writePointer, defaultLongTimeout); - InProgressTx compatTx = - new InProgressTx(entry.getValue().getVisibilityUpperBound(), newExpiration, TransactionType.LONG, - entry.getValue().getCheckpointWritePointers()); - entry.setValue(compatTx); - } else if (entry.getValue().getType() == null) { - InProgressTx compatTx = - new InProgressTx(entry.getValue().getVisibilityUpperBound(), entry.getValue().getExpiration(), - TransactionType.SHORT, entry.getValue().getCheckpointWritePointers()); - entry.setValue(compatTx); - } - } - return inProgress; - } - - /** - * Resets the state of the transaction manager. - */ - public void resetState() { - this.logWriteLock.lock(); - try { - // Take a snapshot before resetting the state, for debugging purposes - doSnapshot(false); - // Clear the state - clear(); - // Take another snapshot: if no snapshot is taken after clearing the state - // and the manager is restarted, we will recover from the snapshot taken - // before resetting the state, which would be really bad - // This call will also init a new WAL - doSnapshot(false); - } catch (IOException e) { - LOG.error("Snapshot failed when resetting state!", e); - e.printStackTrace(); - } finally { - this.logWriteLock.unlock(); - } - } - - /** - * Replay all logged edits from the given transaction logs. - */ - private void replayLogs(Collection<TransactionLog> logs) { - for (TransactionLog log : logs) { - LOG.info("Replaying edits from transaction log " + log.getName()); - int editCnt = 0; - try { - TransactionLogReader reader = log.getReader(); - // reader may be null in the case of an empty file - if (reader == null) { - continue; - } - TransactionEdit edit = null; - while ((edit = reader.next()) != null) { - editCnt++; - switch (edit.getState()) { - case INPROGRESS: - long expiration = edit.getExpiration(); - TransactionType type = edit.getType(); - // Check if transaction needs to be migrated to have expiration and type. Previous version of - // long running transactions were represented with expiration time as -1. - // This can be removed when we stop supporting TransactionEditCodecV2. - if (expiration < 0) { - expiration = getTxExpirationFromWritePointer(edit.getWritePointer(), defaultLongTimeout); - type = TransactionType.LONG; - } else if (type == null) { - type = TransactionType.SHORT; - } - addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), - expiration, type); - break; - case COMMITTING: - addCommittingChangeSet(edit.getWritePointer(), edit.getChanges()); - break; - case COMMITTED: - // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140 - long transactionId = edit.getWritePointer(); - long[] checkpointPointers = edit.getCheckpointPointers(); - long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ? - transactionId : checkpointPointers[checkpointPointers.length - 1]; - doCommit(transactionId, writePointer, edit.getChanges(), - edit.getCommitPointer(), edit.getCanCommit()); - break; - case INVALID: - doInvalidate(edit.getWritePointer()); - break; - case ABORTED: - type = edit.getType(); - // Check if transaction edit needs to be migrated to have type. Previous versions of - // ABORTED edits did not contain type. - // This can be removed when we stop supporting TransactionEditCodecV2. - if (type == null) { - InProgressTx inProgressTx = inProgress.get(edit.getWritePointer()); - if (inProgressTx != null) { - type = inProgressTx.getType(); - } else { - // If transaction is not in-progress, then it has either been already aborted or invalidated. - // We cannot determine the transaction's state based on current information, to be safe invalidate it. - LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay", - edit.getWritePointer()); - doInvalidate(edit.getWritePointer()); - break; - } - } - doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type); - break; - case TRUNCATE_INVALID_TX: - if (edit.getTruncateInvalidTxTime() != 0) { - doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime()); - } else { - doTruncateInvalidTx(edit.getTruncateInvalidTx()); - } - break; - case CHECKPOINT: - doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer()); - break; - default: - // unknown type! - throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState()); - } - } - } catch (IOException ioe) { - throw Throwables.propagate(ioe); - } catch (InvalidTruncateTimeException e) { - throw Throwables.propagate(e); - } - LOG.info("Read " + editCnt + " edits from log " + log.getName()); - } - } - - @Override - public void doStop() { - Stopwatch timer = new Stopwatch().start(); - LOG.info("Shutting down gracefully..."); - // signal the cleanup thread to stop - if (cleanupThread != null) { - cleanupThread.shutdown(); - try { - cleanupThread.join(30000L); - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for cleanup thread to stop"); - Thread.currentThread().interrupt(); - } - } - if (metricsThread != null) { - metricsThread.shutdown(); - try { - metricsThread.join(30000L); - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for cleanup thread to stop"); - Thread.currentThread().interrupt(); - } - } - if (snapshotThread != null) { - // this will trigger a final snapshot on stop - snapshotThread.shutdown(); - try { - snapshotThread.join(30000L); - } catch (InterruptedException ie) { - LOG.warn("Interrupted waiting for snapshot thread to stop"); - Thread.currentThread().interrupt(); - } - } - - persistor.stopAndWait(); - txMetricsCollector.stop(); - timer.stop(); - LOG.info("Took " + timer + " to stop"); - notifyStopped(); - } - - /** - * Immediately shuts down the service, without going through the normal close process. - * @param message A message describing the source of the failure. - * @param error Any exception that caused the failure. - */ - private void abortService(String message, Throwable error) { - if (isRunning()) { - LOG.error("Aborting transaction manager due to: " + message, error); - notifyFailed(error); - } - } - - private void ensureAvailable() { - Preconditions.checkState(isRunning(), "Transaction Manager is not running."); - } - - /** - * Start a short transaction with the default timeout. - */ - public Transaction startShort() { - return startShort(defaultTimeout); - } - - /** - * Start a short transaction with a given timeout. - * @param timeoutInSeconds the time out period in seconds. - */ - public Transaction startShort(int timeoutInSeconds) { - Preconditions.checkArgument(timeoutInSeconds > 0, "timeout must be positive but is %s", timeoutInSeconds); - txMetricsCollector.rate("start.short"); - Stopwatch timer = new Stopwatch().start(); - long expiration = getTxExpiration(timeoutInSeconds); - Transaction tx = startTx(expiration, TransactionType.SHORT); - txMetricsCollector.histogram("start.short.latency", (int) timer.elapsedMillis()); - return tx; - } - - private static long getTxExpiration(long timeoutInSeconds) { - long currentTime = System.currentTimeMillis(); - return currentTime + TimeUnit.SECONDS.toMillis(timeoutInSeconds); - } - - public static long getTxExpirationFromWritePointer(long writePointer, long timeoutInSeconds) { - return writePointer / TxConstants.MAX_TX_PER_MS + TimeUnit.SECONDS.toMillis(timeoutInSeconds); - } - - private long getNextWritePointer() { - // We want to align tx ids with current time. We assume that tx ids are sequential, but not less than - // System.currentTimeMillis() * MAX_TX_PER_MS. - return Math.max(lastWritePointer + 1, System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS); - } - - /** - * Start a long transaction. Long transactions and do not participate in conflict detection. Also, aborting a long - * transaction moves it to the invalid list because we assume that its writes cannot be rolled back. - */ - public Transaction startLong() { - txMetricsCollector.rate("start.long"); - Stopwatch timer = new Stopwatch().start(); - long expiration = getTxExpiration(defaultLongTimeout); - Transaction tx = startTx(expiration, TransactionType.LONG); - txMetricsCollector.histogram("start.long.latency", (int) timer.elapsedMillis()); - return tx; - } - - private Transaction startTx(long expiration, TransactionType type) { - Transaction tx = null; - long txid; - // guard against changes to the transaction log while processing - this.logReadLock.lock(); - try { - synchronized (this) { - ensureAvailable(); - txid = getNextWritePointer(); - tx = createTransaction(txid, type); - addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type); - } - // appending to WAL out of global lock for concurrent performance - // we should still be able to arrive at the same state even if log entries are out of order - appendToLog(TransactionEdit.createStarted(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type)); - } finally { - this.logReadLock.unlock(); - } - return tx; - } - - private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound, - long expiration, TransactionType type) { - inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type)); - advanceWritePointer(writePointer); - } - - private void advanceWritePointer(long writePointer) { - // don't move the write pointer back if we have out of order transaction log entries - if (writePointer > lastWritePointer) { - lastWritePointer = writePointer; - } - } - - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException { - txMetricsCollector.rate("canCommit"); - Stopwatch timer = new Stopwatch().start(); - if (inProgress.get(tx.getTransactionId()) == null) { - // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalid.contains(tx.getTransactionId())) { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", - tx.getTransactionId())); - } else { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); - } - } - - Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size()); - for (byte[] change : changeIds) { - set.add(new ChangeId(change)); - } - - if (hasConflicts(tx, set)) { - return false; - } - // guard against changes to the transaction log while processing - this.logReadLock.lock(); - try { - synchronized (this) { - ensureAvailable(); - addCommittingChangeSet(tx.getTransactionId(), set); - } - appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set)); - } finally { - this.logReadLock.unlock(); - } - txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis()); - return true; - } - - private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) { - committingChangeSets.put(writePointer, changes); - } - - public boolean commit(Transaction tx) throws TransactionNotInProgressException { - txMetricsCollector.rate("commit"); - Stopwatch timer = new Stopwatch().start(); - Set<ChangeId> changeSet = null; - boolean addToCommitted = true; - long commitPointer; - // guard against changes to the transaction log while processing - this.logReadLock.lock(); - try { - synchronized (this) { - ensureAvailable(); - // we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that - // are no longer visible by any in-progress transactions - commitPointer = lastWritePointer + 1; - if (inProgress.get(tx.getTransactionId()) == null) { - // invalid transaction, either this has timed out and moved to invalid, or something else is wrong. - if (invalid.contains(tx.getTransactionId())) { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress " + - "(it is known to be invalid)", tx.getTransactionId())); - } else { - throw new TransactionNotInProgressException( - String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId())); - } - } - - // these should be atomic - // NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot - // be attempted to commit twice - changeSet = committingChangeSets.remove(tx.getTransactionId()); - - if (changeSet != null) { - // double-checking if there are conflicts: someone may have committed since canCommit check - if (hasConflicts(tx, changeSet)) { - return false; - } - } else { - // no changes - addToCommitted = false; - } - doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted); - } - appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted)); - } finally { - this.logReadLock.unlock(); - } - txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis()); - return true; - } - - private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer, - boolean addToCommitted) { - // In case this method is called when loading a previous WAL, we need to remove the tx from these sets - committingChangeSets.remove(transactionId); - if (addToCommitted && !changes.isEmpty()) { - // No need to add empty changes to the committed change sets, they will never trigger any conflict - - // Record the committed change set with the next writePointer as the commit time. - // NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be - // stored under one key - Set<ChangeId> changeIds = committedChangeSets.get(commitPointer); - if (changeIds != null) { - // NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in - // canCommit) use it unguarded - changes.addAll(changeIds); - } - committedChangeSets.put(commitPointer, changes); - } - // remove from in-progress set, so that it does not get excluded in the future - InProgressTx previous = inProgress.remove(transactionId); - if (previous == null) { - // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. - if (invalid.rem(transactionId)) { - invalidArray = invalid.toLongArray(); - LOG.info("Tx invalid list: removed committed tx {}", transactionId); - } - } - // moving read pointer - moveReadPointerIfNeeded(writePointer); - - // All committed change sets that are smaller than the earliest started transaction can be removed. - // here we ignore transactions that have no timeout, they are long-running and don't participate in - // conflict detection. - // TODO: for efficiency, can we do this once per-log in replayLogs instead of once per edit? - committedChangeSets.headMap(TxUtils.getFirstShortInProgress(inProgress)).clear(); - } - - public void abort(Transaction tx) { - // guard against changes to the transaction log while processing - txMetricsCollector.rate("abort"); - Stopwatch timer = new Stopwatch().start(); - this.logReadLock.lock(); - try { - synchronized (this) { - ensureAvailable(); - doAbort(tx.getTransactionId(), tx.getCheckpointWritePointers(), tx.getType()); - } - appendToLog(TransactionEdit.createAborted(tx.getTransactionId(), tx.getType(), tx.getCheckpointWritePointers())); - txMetricsCollector.histogram("abort.latency", (int) timer.elapsedMillis()); - } finally { - this.logReadLock.unlock(); - } - } - - private void doAbort(long writePointer, long[] checkpointWritePointers, TransactionType type) { - committingChangeSets.remove(writePointer); - - if (type == TransactionType.LONG) { - // Long running transactions cannot be aborted as their change sets are not saved, - // and hence the changes cannot be rolled back. Invalidate the long running transaction instead. - doInvalidate(writePointer); - return; - } - - // makes tx visible (assumes that all operations were rolled back) - // remove from in-progress set, so that it does not get excluded in the future - InProgressTx removed = inProgress.remove(writePointer); - if (removed == null) { - // tx was not in progress! perhaps it timed out and is invalid? try to remove it there. - if (invalid.rem(writePointer)) { - // remove any invalidated checkpoint pointers - // this will only be present if the parent write pointer was also invalidated - if (checkpointWritePointers != null) { - for (int i = 0; i < checkpointWritePointers.length; i++) { - invalid.rem(checkpointWritePointers[i]); - } - } - invalidArray = invalid.toLongArray(); - LOG.info("Tx invalid list: removed aborted tx {}", writePointer); - // removed a tx from excludes: must move read pointer - moveReadPointerIfNeeded(writePointer); - } - } else { - // removed a tx from excludes: must move read pointer - moveReadPointerIfNeeded(writePointer); - } - } - - public boolean invalidate(long tx) { - // guard against changes to the transaction log while processing - txMetricsCollector.rate("invalidate"); - Stopwatch timer = new Stopwatch().start(); - this.logReadLock.lock(); - try { - boolean success; - synchronized (this) { - ensureAvailable(); - success = doInvalidate(tx); - } - appendToLog(TransactionEdit.createInvalid(tx)); - txMetricsCollector.histogram("invalidate.latency", (int) timer.elapsedMillis()); - return success; - } finally { - this.logReadLock.unlock(); - } - } - - private boolean doInvalidate(long writePointer) { - Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer); - // remove from in-progress set, so that it does not get excluded in the future - InProgressTx previous = inProgress.remove(writePointer); - // This check is to prevent from invalidating committed transactions - if (previous != null || previousChangeSet != null) { - // add tx to invalids - invalid.add(writePointer); - if (previous == null) { - LOG.debug("Invalidating tx {} in committing change sets but not in-progress", writePointer); - } else { - // invalidate any checkpoint write pointers - LongArrayList childWritePointers = previous.getCheckpointWritePointers(); - if (childWritePointers != null) { - for (int i = 0; i < childWritePointers.size(); i++) { - invalid.add(childWritePointers.get(i)); - } - } - } - LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer); - // todo: find a more efficient way to keep this sorted. Could it just be an array? - Collections.sort(invalid); - invalidArray = invalid.toLongArray(); - if (previous != null && !previous.isLongRunning()) { - // tx was short-running: must move read pointer - moveReadPointerIfNeeded(writePointer); - } - return true; - } - return false; - } - - /** - * Removes the given transaction ids from the invalid list. - * @param invalidTxIds transaction ids - * @return true if invalid list got changed, false otherwise - */ - public boolean truncateInvalidTx(Set<Long> invalidTxIds) { - // guard against changes to the transaction log while processing - txMetricsCollector.rate("truncateInvalidTx"); - Stopwatch timer = new Stopwatch().start(); - this.logReadLock.lock(); - try { - boolean success; - synchronized (this) { - ensureAvailable(); - success = doTruncateInvalidTx(invalidTxIds); - } - appendToLog(TransactionEdit.createTruncateInvalidTx(invalidTxIds)); - txMetricsCollector.histogram("truncateInvalidTx.latency", (int) timer.elapsedMillis()); - return success; - } finally { - this.logReadLock.unlock(); - } - } - - private boolean doTruncateInvalidTx(Set<Long> invalidTxIds) { - LOG.info("Removing tx ids {} from invalid list", invalidTxIds); - boolean success = invalid.removeAll(invalidTxIds); - if (success) { - invalidArray = invalid.toLongArray(); - } - return success; - } - - /** - * Removes all transaction ids started before the given time from invalid list. - * @param time time in milliseconds - * @return true if invalid list got changed, false otherwise - * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time - */ - public boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { - // guard against changes to the transaction log while processing - txMetricsCollector.rate("truncateInvalidTxBefore"); - Stopwatch timer = new Stopwatch().start(); - this.logReadLock.lock(); - try { - boolean success; - synchronized (this) { - ensureAvailable(); - success = doTruncateInvalidTxBefore(time); - } - appendToLog(TransactionEdit.createTruncateInvalidTxBefore(time)); - txMetricsCollector.histogram("truncateInvalidTxBefore.latency", (int) timer.elapsedMillis()); - return success; - } finally { - this.logReadLock.unlock(); - } - } - - private boolean doTruncateInvalidTxBefore(long time) throws InvalidTruncateTimeException { - LOG.info("Removing tx ids before {} from invalid list", time); - long truncateWp = time * TxConstants.MAX_TX_PER_MS; - // Check if there any in-progress transactions started earlier than truncate time - if (inProgress.lowerKey(truncateWp) != null) { - throw new InvalidTruncateTimeException("Transactions started earlier than " + time + " are in-progress"); - } - - // Find all invalid transactions earlier than truncateWp - Set<Long> toTruncate = Sets.newHashSet(); - for (long wp : invalid) { - // invalid list is sorted, hence can stop as soon as we reach a wp >= truncateWp - if (wp >= truncateWp) { - break; - } - toTruncate.add(wp); - } - return doTruncateInvalidTx(toTruncate); - } - - public Transaction checkpoint(Transaction originalTx) throws TransactionNotInProgressException { - txMetricsCollector.rate("checkpoint"); - Stopwatch timer = new Stopwatch().start(); - - Transaction checkpointedTx = null; - long txId = originalTx.getTransactionId(); - long newWritePointer = 0; - // guard against changes to the transaction log while processing - this.logReadLock.lock(); - try { - synchronized (this) { - ensureAvailable(); - // check that the parent tx is in progress - InProgressTx parentTx = inProgress.get(txId); - if (parentTx == null) { - if (invalid.contains(txId)) { - throw new TransactionNotInProgressException( - String.format("Transaction %d is not in progress because it was invalidated", txId)); - } else { - throw new TransactionNotInProgressException( - String.format("Transaction %d is not in progress", txId)); - } - } - newWritePointer = getNextWritePointer(); - doCheckpoint(newWritePointer, txId); - // create a new transaction with the same read snapshot, plus the additional checkpoint write pointer - // the same read snapshot is maintained to - checkpointedTx = new Transaction(originalTx, newWritePointer, - parentTx.getCheckpointWritePointers().toLongArray()); - } - // appending to WAL out of global lock for concurrent performance - // we should still be able to arrive at the same state even if log entries are out of order - appendToLog(TransactionEdit.createCheckpoint(newWritePointer, txId)); - } finally { - this.logReadLock.unlock(); - } - txMetricsCollector.histogram("checkpoint.latency", (int) timer.elapsedMillis()); - - return checkpointedTx; - } - - private void doCheckpoint(long newWritePointer, long parentWritePointer) { - InProgressTx existingTx = inProgress.get(parentWritePointer); - existingTx.addCheckpointWritePointer(newWritePointer); - advanceWritePointer(newWritePointer); - } - - // hack for exposing important metric - public int getExcludedListSize() { - return invalid.size() + inProgress.size(); - } - - /** - * @return the size of invalid list - */ - public int getInvalidSize() { - return this.invalid.size(); - } - - int getCommittedSize() { - return this.committedChangeSets.size(); - } - - private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) { - if (changeIds.isEmpty()) { - return false; - } - - for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) { - // If commit time is greater than tx read-pointer, - // basically not visible but committed means "tx committed after given tx was started" - if (changeSet.getKey() > tx.getTransactionId()) { - if (overlap(changeSet.getValue(), changeIds)) { - return true; - } - } - } - return false; - } - - private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) { - // iterate over the smaller set, and check for every element in the other set - if (a.size() > b.size()) { - for (ChangeId change : b) { - if (a.contains(change)) { - return true; - } - } - } else { - for (ChangeId change : a) { - if (b.contains(change)) { - return true; - } - } - } - return false; - } - - private void moveReadPointerIfNeeded(long committedWritePointer) { - if (committedWritePointer > readPointer) { - readPointer = committedWritePointer; - } - } - - /** - * Creates a new Transaction. This method only get called from start transaction, which is already - * synchronized. - */ - private Transaction createTransaction(long writePointer, TransactionType type) { - // For holding the first in progress short transaction Id (with timeout >= 0). - long firstShortTx = Transaction.NO_TX_IN_PROGRESS; - LongArrayList inProgressIds = new LongArrayList(inProgress.size()); - for (Map.Entry<Long, InProgressTx> entry : inProgress.entrySet()) { - long txId = entry.getKey(); - inProgressIds.add(txId); - // add any checkpointed write pointers to the in-progress list - LongArrayList childIds = entry.getValue().getCheckpointWritePointers(); - if (childIds != null) { - for (int i = 0; i < childIds.size(); i++) { - inProgressIds.add(childIds.get(i)); - } - } - if (firstShortTx == Transaction.NO_TX_IN_PROGRESS && !entry.getValue().isLongRunning()) { - firstShortTx = txId; - } - } - - return new Transaction(readPointer, writePointer, invalidArray, inProgressIds.toLongArray(), firstShortTx, type); - } - - private void appendToLog(TransactionEdit edit) { - try { - Stopwatch timer = new Stopwatch().start(); - currentLog.append(edit); - txMetricsCollector.rate("wal.append.count"); - txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis()); - } catch (IOException ioe) { - abortService("Error appending to transaction log", ioe); - } - } - - private void appendToLog(List<TransactionEdit> edits) { - try { - Stopwatch timer = new Stopwatch().start(); - currentLog.append(edits); - txMetricsCollector.rate("wal.append.count", edits.size()); - txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis()); - } catch (IOException ioe) { - abortService("Error appending to transaction log", ioe); - } - } - - /** - * Called from the tx service every 10 seconds. - * This hack is needed because current metrics system is not flexible when it comes to adding new metrics. - */ - public void logStatistics() { - LOG.info("Transaction Statistics: write pointer = " + lastWritePointer + - ", invalid = " + invalid.size() + - ", in progress = " + inProgress.size() + - ", committing = " + committingChangeSets.size() + - ", committed = " + committedChangeSets.size()); - } - - private abstract static class DaemonThreadExecutor extends Thread { - private AtomicBoolean stopped = new AtomicBoolean(false); - - public DaemonThreadExecutor(String name) { - super(name); - setDaemon(true); - } - - public void run() { - try { - while (!isInterrupted() && !stopped.get()) { - doRun(); - synchronized (stopped) { - stopped.wait(getSleepMillis()); - } - } - } catch (InterruptedException ie) { - LOG.info("Interrupted thread " + getName()); - } - // perform any final cleanup - onShutdown(); - LOG.info("Exiting thread " + getName()); - } - - public abstract void doRun(); - - protected abstract long getSleepMillis(); - - protected void onShutdown() { - } - - public void shutdown() { - if (stopped.compareAndSet(false, true)) { - synchronized (stopped) { - stopped.notifyAll(); - } - } - } - } - - /** - * Represents some of the info on in-progress tx - */ - public static final class InProgressTx { - /** the oldest in progress tx at the time of this tx start */ - private final long visibilityUpperBound; - private final long expiration; - private final TransactionType type; - private LongArrayList checkpointWritePointers = new LongArrayList(); - - public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type) { - this(visibilityUpperBound, expiration, type, new LongArrayList()); - } - - public InProgressTx(long visibilityUpperBound, long expiration, TransactionType type, - LongArrayList checkpointWritePointers) { - this.visibilityUpperBound = visibilityUpperBound; - this.expiration = expiration; - this.type = type; - this.checkpointWritePointers = checkpointWritePointers; - } - - // For backwards compatibility when long running txns were represented with -1 expiration - @Deprecated - public InProgressTx(long visibilityUpperBound, long expiration) { - this(visibilityUpperBound, expiration, null); - } - - public long getVisibilityUpperBound() { - return visibilityUpperBound; - } - - public long getExpiration() { - return expiration; - } - - @Nullable - public TransactionType getType() { - return type; - } - - public boolean isLongRunning() { - if (type == null) { - // for backwards compatibility when long running txns were represented with -1 expiration - return expiration == -1; - } - return type == TransactionType.LONG; - } - - public void addCheckpointWritePointer(long checkpointWritePointer) { - checkpointWritePointers.add(checkpointWritePointer); - } - - public LongArrayList getCheckpointWritePointers() { - return checkpointWritePointers; - } - - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof InProgressTx)) { - return false; - } - - if (this == o) { - return true; - } - - InProgressTx other = (InProgressTx) o; - return Objects.equal(visibilityUpperBound, other.getVisibilityUpperBound()) && - Objects.equal(expiration, other.getExpiration()) && - Objects.equal(type, other.type) && - Objects.equal(checkpointWritePointers, other.checkpointWritePointers); - } - - @Override - public int hashCode() { - return Objects.hashCode(visibilityUpperBound, expiration, type, checkpointWritePointers); - } - - @Override - public String toString() { - return Objects.toStringHelper(this) - .add("visibilityUpperBound", visibilityUpperBound) - .add("expiration", expiration) - .add("type", type) - .add("checkpointWritePointers", checkpointWritePointers) - .toString(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java b/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java deleted file mode 100644 index 8437017..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionServiceMain.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.distributed.TransactionService; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.inject.Guice; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.zookeeper.ZKClientService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; - -/** - * Driver class to start and stop tx in distributed mode. - */ -public class TransactionServiceMain { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionServiceMain.class); - - private Configuration conf; - private TransactionService txService; - - public static void main(String args[]) throws Exception { - TransactionServiceMain instance = new TransactionServiceMain(); - instance.doMain(args); - } - - public TransactionServiceMain() { - this(null); - } - - public TransactionServiceMain(Configuration conf) { - this.conf = conf; - } - - /** - * The main method. It simply call methods in the same sequence - * as if the program is started by jsvc. - */ - public void doMain(final String[] args) throws Exception { - final CountDownLatch shutdownLatch = new CountDownLatch(1); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - try { - TransactionServiceMain.this.stop(); - } finally { - try { - TransactionServiceMain.this.destroy(); - } finally { - shutdownLatch.countDown(); - } - } - } catch (Throwable t) { - LOG.error("Exception when shutting down: " + t.getMessage(), t); - } - } - }); - init(args); - start(); - - shutdownLatch.await(); - } - - /** - * Invoked by jsvc to initialize the program. - */ - public void init(String[] args) { - if (conf == null) { - conf = new ConfigurationFactory().get(); - } - } - - /** - * Invoked by jsvc to start the program. - */ - public void start() throws Exception { - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() - ); - - ZKClientService zkClientService = injector.getInstance(ZKClientService.class); - zkClientService.startAndWait(); - - // start a tx server - txService = injector.getInstance(TransactionService.class); - try { - LOG.info("Starting {}", getClass().getSimpleName()); - txService.startAndWait(); - } catch (Exception e) { - System.err.println("Failed to start service: " + e.getMessage()); - } - } - - /** - * Invoked by jsvc to stop the program. - */ - public void stop() { - LOG.info("Stopping {}", getClass().getSimpleName()); - if (txService == null) { - return; - } - try { - if (txService.isRunning()) { - txService.stopAndWait(); - } - } catch (Throwable e) { - LOG.error("Failed to shutdown transaction service.", e); - // because shutdown hooks execute concurrently, the logger may be closed already: thus also print it. - System.err.println("Failed to shutdown transaction service: " + e.getMessage()); - e.printStackTrace(System.err); - } - } - - /** - * Invoked by jsvc for resource cleanup. - */ - public void destroy() { } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java deleted file mode 100644 index ca68143..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionSystemClient.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import java.io.InputStream; -import java.util.Collection; -import java.util.Set; - -/** - * Client talking to transaction system. - * See also {@link co.cask.tephra.TransactionAware}. - * todo: explain Omid. - */ -public interface TransactionSystemClient { - /** - * Starts new short transaction. - * @return instance of {@link co.cask.tephra.Transaction} - */ - // TODO: "short" and "long" are very misleading names. Use transaction attributes like "detect conflicts or not", etc. - Transaction startShort(); - - /** - * Starts new short transaction. - * @param timeout the timeout for the transaction - * @return instance of {@link Transaction} - */ - Transaction startShort(int timeout); - - /** - * Starts new long transaction. - * @return instance of {@link Transaction} - */ - Transaction startLong(); - - // this pre-commit detects conflicts with other transactions committed so far - // NOTE: the changes set should not change after this operation, this may help us do some extra optimizations - // NOTE: there should be time constraint on how long does it take to commit changes by the client after this operation - // is submitted so that we can cleanup related resources - // NOTE: as of now you can call this method multiple times, each time the changeSet of tx will be updated. Not sure - // if we can call it a feature or a side-affect of implementation. It makes more sense to append changeset, but - // before we really need it we don't do it because it will slow down tx manager throughput. - - /** - * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and - * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call - * and {@link #commit(Transaction)} which may check conflicts again to avoid races. - * <p/> - * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts - * before we persist changes to avoid rollback in case of conflicts as much as possible. - * NOTE: in some situations we may want to skip this step to save on RPC with a risk of many rollback ops. So by - * default we take safe path. - * - * @param tx transaction to verify - * @param changeIds ids of changes made by transaction - * @return true if transaction can be committed otherwise false - */ - boolean canCommit(Transaction tx, Collection<byte[]> changeIds) throws TransactionNotInProgressException; - - /** - * Makes transaction visible. It will again check conflicts of changes submitted previously with - * {@link #canCommit(Transaction, java.util.Collection)} - * @param tx transaction to make visible. - * @return true if transaction can be committed otherwise false - */ - boolean commit(Transaction tx) throws TransactionNotInProgressException; - - /** - * Makes transaction visible. You should call it only when all changes of this tx are undone. - * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out. - * @param tx transaction to make visible. - */ - void abort(Transaction tx); - - /** - * Makes transaction invalid. You should call it if not all changes of this tx could be undone. - * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out. - * @param tx transaction id to invalidate. - * @return true if transaction has been successfully invalidated - */ - boolean invalidate(long tx); - - /** - * Performs a checkpoint operation on the current transaction, returning a new Transaction instance with the - * updated state. A checkpoint operation assigns a new write pointer for the current transaction. - * @param tx the current transaction to checkpoint - * @return an updated transaction instance with the new write pointer - */ - Transaction checkpoint(Transaction tx) throws TransactionNotInProgressException; - - /** - * Retrieves the state of the transaction manager and send it as a stream. The snapshot will not be persisted. - * @return an input stream containing an encoded snapshot of the transaction manager - */ - InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException; - - /** - * Return the status of the transaction Manager - * @return a String which denotes the status of txManager - */ - String status(); - - /** - * Resets the state of the transaction manager. - */ - void resetState(); - - /** - * Removes the given transaction ids from the invalid list. - * @param invalidTxIds transaction ids - * @return true if invalid list got changed, false otherwise - */ - boolean truncateInvalidTx(Set<Long> invalidTxIds); - - /** - * Removes all transaction ids started before the given time from invalid list. - * @param time time in milliseconds - * @return true if invalid list got changed, false otherwise - * @throws InvalidTruncateTimeException if there are any in-progress transactions started before given time - */ - boolean truncateInvalidTxBefore(long time) throws InvalidTruncateTimeException; - - /** - * @return the size of invalid list - */ - int getInvalidSize(); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java b/tephra-core/src/main/java/co/cask/tephra/TxConstants.java deleted file mode 100644 index 7c96c85..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TxConstants.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.snapshot.DefaultSnapshotCodec; -import co.cask.tephra.snapshot.SnapshotCodecV2; -import co.cask.tephra.snapshot.SnapshotCodecV3; -import co.cask.tephra.snapshot.SnapshotCodecV4; - -import java.util.concurrent.TimeUnit; - -/** - * Transaction system constants - */ -public class TxConstants { - /** - * Defines what level of conflict detection should be used for transactions. {@code ROW} means that only the - * table name and row key for each change will be used to determine if the transaction change sets conflict. - * {@code COLUMN} means that the table name, row key, column family, and column qualifier will all be used to - * identify write conflicts. {@code NONE} means that no conflict detection will be performed, but transaction - * clients will still track the current transaction's change set to rollback any persisted changes in the event of - * a failure. This should only be used where writes to the same coordinate should never conflict, such as - * append-only data. The default value used by {@code TransactionAwareHTable} implementations is {@code COLUMN}. - * - * <p> - * <strong>Note: for a given table, all clients must use the same conflict detection setting!</strong> - * Otherwise conflicts will not be flagged correctly. - * </p> - */ - public enum ConflictDetection { - ROW, - COLUMN, - NONE - } - - /** - * Property set for {@code org.apache.hadoop.hbase.HColumnDescriptor} to configure time-to-live on data within - * the column family. The value given is in milliseconds. Once a cell's data has surpassed the given value in age, - * the cell's data will no longer be visible and may be garbage collected. - */ - public static final String PROPERTY_TTL = "dataset.table.ttl"; - - /** - * This is how many tx we allow per millisecond, if you care about the system for 100 years: - * Long.MAX_VALUE / (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(365 * 100)) = - * (as of Feb 20, 2014) 2,028,653. It is safe and convenient to use 1,000,000 as multiplier: - * <ul> - * <li> - * we hardly can do more than 1 billion txs per second - * </li> - * <li> - * long value will not overflow for 200 years - * </li> - * <li> - * makes reading & debugging easier if multiplier is 10^n - * </li> - * </ul> - */ - public static final long MAX_TX_PER_MS = 1000000; - - /** - * Since HBase {@code Delete} operations cannot be undone at the same timestamp, "deleted" data is instead - * overwritten with an empty {@code byte[]} to flag it as removed. Cells with empty values will be filtered out - * of the results for read operations. If cells with empty values should be included in results (meaning data - * cannot be transactionally deleted), then set this configuration property to true. - */ - public static final String ALLOW_EMPTY_VALUES_KEY = "data.tx.allow.empty.values"; - public static final boolean ALLOW_EMPTY_VALUES_DEFAULT = false; - - /** - * Key used to set the serialized transaction as an attribute on Get and Scan operations. - */ - public static final String TX_OPERATION_ATTRIBUTE_KEY = "cask.tx"; - /** - * Key used to flag a delete operation as part of a transaction rollback. This is used so that the - * {@code TransactionProcessor} coprocessor loaded on a table can differentiate between deletes issued - * as part of a normal client operation versus those performed when rolling back a transaction. - */ - public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "cask.tx.rollback"; - - /** - * Column qualifier used for a special delete marker tombstone, which identifies an entire column family as deleted. - */ - public static final byte[] FAMILY_DELETE_QUALIFIER = new byte[0]; - - // Constants for monitoring status - public static final String STATUS_OK = "OK"; - public static final String STATUS_NOTOK = "NOTOK"; - - /** - * Indicates whether data written before Tephra was enabled on a table should be readable. Reading non-transactional - * data can lead to slight performance penalty. Hence it is disabled by default. - * @see <a href="https://issues.cask.co/browse/TEPHRA-89">TEPHRA-89</a> - */ - public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; - public static final boolean DEFAULT_READ_NON_TX_DATA = false; - - /** - * TransactionManager configuration. - */ - public static final class Manager { - // TransactionManager configuration - public static final String CFG_DO_PERSIST = "tx.persist"; - /** Directory in HDFS used for transaction snapshot and log storage. */ - public static final String CFG_TX_SNAPSHOT_DIR = "data.tx.snapshot.dir"; - /** Directory on the local filesystem used for transaction snapshot and log storage. */ - public static final String CFG_TX_SNAPSHOT_LOCAL_DIR = "data.tx.snapshot.local.dir"; - /** How often to clean up timed out transactions, in seconds, or 0 for no cleanup. */ - public static final String CFG_TX_CLEANUP_INTERVAL = "data.tx.cleanup.interval"; - /** The user id to access HDFS if not running in secure HDFS. */ - public static final String CFG_TX_HDFS_USER = "data.tx.hdfs.user"; - /** Default value for how often to check in-progress transactions for expiration, in seconds. */ - public static final int DEFAULT_TX_CLEANUP_INTERVAL = 10; - /** - * The timeout for a transaction, in seconds. If the transaction is not finished in that time, - * it is marked invalid. - */ - public static final String CFG_TX_TIMEOUT = "data.tx.timeout"; - /** Default value for transaction timeout, in seconds. */ - public static final int DEFAULT_TX_TIMEOUT = 30; - /** - * The timeout for a long running transaction, in seconds. If the transaction is not finished in that time, - * it is marked invalid. - */ - public static final String CFG_TX_LONG_TIMEOUT = "data.tx.long.timeout"; - /** Default value for long running transaction timeout, in seconds. */ - public static final int DEFAULT_TX_LONG_TIMEOUT = (int) TimeUnit.DAYS.toSeconds(1); - /** The frequency (in seconds) to perform periodic snapshots, or 0 for no periodic snapshots. */ - public static final String CFG_TX_SNAPSHOT_INTERVAL = "data.tx.snapshot.interval"; - /** Default value for frequency of periodic snapshots of transaction state. */ - public static final long DEFAULT_TX_SNAPSHOT_INTERVAL = 300; - /** Number of most recent transaction snapshots to retain. */ - public static final String CFG_TX_SNAPSHOT_RETAIN = "data.tx.snapshot.retain"; - /** Default value for number of most recent snapshots to retain. */ - public static final int DEFAULT_TX_SNAPSHOT_RETAIN = 10; - } - - /** - * TransactionService configuration. - */ - public static final class Service { - - /** for the zookeeper quorum string for leader election for tx server. */ - public static final String CFG_DATA_TX_ZOOKEEPER_QUORUM - = "data.tx.zookeeper.quorum"; - - /** for the name used to announce service availability to discovery service */ - public static final String CFG_DATA_TX_DISCOVERY_SERVICE_NAME - = "data.tx.discovery.service.name"; - - /** for the port of the tx server. */ - public static final String CFG_DATA_TX_BIND_PORT - = "data.tx.bind.port"; - - /** for the address (hostname) of the tx server. */ - public static final String CFG_DATA_TX_BIND_ADDRESS - = "data.tx.bind.address"; - - /** the number of IO threads in the tx service. */ - public static final String CFG_DATA_TX_SERVER_IO_THREADS - = "data.tx.server.io.threads"; - - /** the number of handler threads in the tx service. */ - public static final String CFG_DATA_TX_SERVER_THREADS - = "data.tx.server.threads"; - - public static final String CFG_DATA_TX_THRIFT_MAX_READ_BUFFER - = "data.tx.thrift.max.read.buffer"; - - public static final String DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME - = "transaction"; - - /** default tx service port. */ - public static final int DEFAULT_DATA_TX_BIND_PORT - = 15165; - - /** default tx service address. */ - public static final String DEFAULT_DATA_TX_BIND_ADDRESS - = "0.0.0.0"; - - /** default number of handler IO threads in tx service. */ - public static final int DEFAULT_DATA_TX_SERVER_IO_THREADS - = 2; - - /** default number of handler threads in tx service. */ - public static final int DEFAULT_DATA_TX_SERVER_THREADS - = 20; - - /** default thrift max read buffer size */ - public static final int DEFAULT_DATA_TX_THRIFT_MAX_READ_BUFFER - = 16 * 1024 * 1024; - - // Configuration key names and defaults used by tx client. - - /** to specify the tx client socket timeout in ms. */ - public static final String CFG_DATA_TX_CLIENT_TIMEOUT - = "data.tx.client.timeout"; - - /** to specify the tx client provider strategy. */ - public static final String CFG_DATA_TX_CLIENT_PROVIDER - = "data.tx.client.provider"; - - /** to specify the number of threads for client provider "pool". */ - public static final String CFG_DATA_TX_CLIENT_COUNT - = "data.tx.client.count"; - - /** timeout (in milliseconds) for obtaining client from client provider "pool". */ - public static final String CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS - = "data.tx.client.obtain.timeout"; - - /** to specify the retry strategy for a failed thrift call. */ - public static final String CFG_DATA_TX_CLIENT_RETRY_STRATEGY - = "data.tx.client.retry.strategy"; - - /** to specify the number of times to retry a failed thrift call. */ - public static final String CFG_DATA_TX_CLIENT_ATTEMPTS - = "data.tx.client.retry.attempts"; - - /** to specify the initial sleep time for retry strategy backoff. */ - public static final String CFG_DATA_TX_CLIENT_BACKOFF_INITIAL - = "data.tx.client.retry.backoff.initial"; - - /** to specify the backoff factor for retry strategy backoff. */ - public static final String CFG_DATA_TX_CLIENT_BACKOFF_FACTOR - = "data.tx.client.retry.backoff.factor"; - - /** to specify the sleep time limit for retry strategy backoff. */ - public static final String CFG_DATA_TX_CLIENT_BACKOFF_LIMIT - = "data.tx.client.retry.backoff.limit"; - - /** the default tx client socket timeout in milli seconds. */ - public static final int DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS - = 30 * 1000; - - /** default number of tx clients for client provider "pool". */ - public static final int DEFAULT_DATA_TX_CLIENT_COUNT - = 50; - - /** default timeout (in milliseconds) for obtaining client from client provider "pool". */ - public static final long DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS - = TimeUnit.SECONDS.toMillis(3); - - /** default tx client provider strategy. */ - public static final String DEFAULT_DATA_TX_CLIENT_PROVIDER - = "pool"; - - /** retry strategy for thrift clients, e.g. backoff, or n-times. */ - public static final String DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY - = "backoff"; - - /** default number of attempts for strategy n-times. */ - public static final int DEFAULT_DATA_TX_CLIENT_ATTEMPTS - = 2; - - /** default initial sleep is 100ms. */ - public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL - = 100; - - /** default backoff factor is 4. */ - public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR - = 4; - - /** default sleep limit is 30 sec. */ - public static final int DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT - = 30 * 1000; - } - - /** - * Configuration properties for metrics reporting - */ - public static final class Metrics { - /** - * Frequency at which metrics should be reported, in seconds. - */ - public static final String REPORT_PERIOD_KEY = "data.tx.metrics.period"; - /** - * Default report period for metrics, in seconds. - */ - public static final int REPORT_PERIOD_DEFAULT = 60; - } - - /** - * Configuration properties used by HBase - */ - public static final class HBase { - public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; - public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000; - } - - /** - * Configuration for the TransactionDataJanitor coprocessor. - */ - public static final class DataJanitor { - /** - * Whether or not the TransactionDataJanitor coprocessor should be enabled on tables. - * Disable for testing. - */ - public static final String CFG_TX_JANITOR_ENABLE = "data.tx.janitor.enable"; - public static final boolean DEFAULT_TX_JANITOR_ENABLE = true; - } - - /** - * Configuration for the transaction snapshot persistence. - */ - public static final class Persist { - /** - * The class names of all known transaction snapshot codecs. - */ - public static final String CFG_TX_SNAPHOT_CODEC_CLASSES = "data.tx.snapshot.codecs"; - public static final Class[] DEFAULT_TX_SNAPHOT_CODEC_CLASSES = - { DefaultSnapshotCodec.class, SnapshotCodecV2.class, SnapshotCodecV3.class, SnapshotCodecV4.class }; - } - - /** - * Configuration for transaction log edit entries - */ - public static final class TransactionLog { - /** - * Key used to denote the number of entries appended. - */ - public static final String NUM_ENTRIES_APPENDED = "count"; - public static final String VERSION_KEY = "version"; - public static final byte CURRENT_VERSION = 2; - } - -}
