http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java 
b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
new file mode 100644
index 0000000..4c33538
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Retry strategy for failed transactions
+ */
+public interface RetryStrategy {
+  /**
+   * Returns the number of milliseconds to wait before retrying the operation.
+   *
+   * @param reason Reason for transaction failure.
+   * @param failureCount Number of times that the request has been failed.
+   * @return Number of milliseconds to wait before retrying the operation. 
Returning {@code 0} means
+   *         retry it immediately, while negative means abort the operation.
+   */
+  long nextRetry(TransactionFailureException reason, int failureCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
new file mode 100644
index 0000000..e5902a7
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.twill.zookeeper.ZKClientService;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Allows calling some methods on {@link TransactionManager} from command line.
+ */
+public class TransactionAdmin {
+  private static final String OPT_TRUNCATE_INVALID_TX = 
"--truncate-invalid-tx";
+  private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = 
"--truncate-invalid-tx-before";
+  private static final String OPT_GET_INVALID_TX_SIZE = 
"--get-invalid-tx-size";
+  
+  private final PrintStream out;
+  private final PrintStream err;
+
+  public static void main(String[] args) {
+    TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err);
+    int status = txAdmin.doMain(args, new Configuration());
+    System.exit(status);
+  }
+
+  public TransactionAdmin(PrintStream out, PrintStream err) {
+    this.out = out;
+    this.err = err;
+  }
+
+  @VisibleForTesting
+  int doMain(String[] args, Configuration conf) {
+    if (args.length < 1) {
+      printUsage();
+      return 1;
+    }
+
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      new TransactionModules().getDistributedModules(),
+      new TransactionClientModule()
+    );
+
+    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+    
+    try {
+      TransactionSystemClient txClient = 
injector.getInstance(TransactionSystemClient.class);
+      String option = args[0];
+      
+      if (option.equals(OPT_TRUNCATE_INVALID_TX)) {
+        if (args.length != 2) {
+          printUsage();
+          return 1;
+        }
+        Set<Long> txIds;
+        try {
+          txIds = parseTxIds(args[1]);
+        } catch (NumberFormatException e) {
+          err.println("NumberFormatException: " + e.getMessage());
+          return 1;
+        }
+        if (!txIds.isEmpty()) {
+          out.println("Invalid list size before truncation: " + 
txClient.getInvalidSize());
+          txClient.truncateInvalidTx(txIds);
+          out.println("Invalid list size after truncation: " + 
txClient.getInvalidSize());
+        }
+      } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) {
+        if (args.length != 2) {
+          printUsage();
+          return 1;
+        }
+        try {
+          long time = Long.parseLong(args[1]);
+          out.println("Invalid list size before truncation: " + 
txClient.getInvalidSize());
+          txClient.truncateInvalidTxBefore(time);
+          out.println("Invalid list size after truncation: " + 
txClient.getInvalidSize());
+        } catch (InvalidTruncateTimeException e) {
+          err.println(e.getMessage());
+          return 1;
+        } catch (NumberFormatException e) {
+          err.println("NumberFormatException: " + e.getMessage());
+          return 1;
+        }
+      } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) {
+        if (args.length != 1) {
+          printUsage();
+          return 1;
+        }
+        out.println("Invalid list size: " + txClient.getInvalidSize());
+      } else {
+        printUsage();
+        return 1;
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+    return 0;
+  }
+  
+  private Set<Long> parseTxIds(String option) throws NumberFormatException {
+    Set<Long> txIds = Sets.newHashSet();
+    for (String str : Splitter.on(',').split(option)) {
+      txIds.add(Long.parseLong(str));
+    }
+    return txIds;
+  }
+  
+  private void printUsage() {
+    String programName = TransactionAdmin.class.getSimpleName();
+    String spaces = "     ";
+    List<String> options = Lists.newArrayList();
+    options.add(join("Usage: "));
+    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, 
"<tx1,tx2,...>"));
+    options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, 
"<time in secs>"));
+    options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE));
+    
+    String usage = 
Joiner.on(System.getProperty("line.separator")).join(options);
+    err.println(usage);
+  }
+  
+  private static String join(String... args) {
+    return Joiner.on(" ").join(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
new file mode 100644
index 0000000..c147917
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import org.apache.tephra.distributed.TransactionConverterUtils;
+import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import java.io.IOException;
+
+/**
+ * Handles serialization and deserialization of {@link Transaction} instances 
to and from {@code byte[]}.
+ */
+public class TransactionCodec {
+
+  public TransactionCodec() {
+  }
+
+  public byte[] encode(Transaction tx) throws IOException {
+    TTransaction thriftTx = TransactionConverterUtils.wrap(tx);
+    TSerializer serializer = new TSerializer();
+    try {
+      return serializer.serialize(thriftTx);
+    } catch (TException te) {
+      throw new IOException(te);
+    }
+  }
+
+  public Transaction decode(byte[] encoded) throws IOException {
+    TTransaction thriftTx = new TTransaction();
+    TDeserializer deserializer = new TDeserializer();
+    try {
+      deserializer.deserialize(thriftTx, encoded);
+      return TransactionConverterUtils.unwrap(thriftTx);
+    } catch (TException te) {
+      throw new IOException(te);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
new file mode 100644
index 0000000..22a59c6
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import javax.annotation.Nullable;
+
+/**
+ * Utility class that encapsulates the transaction life cycle over a given set 
of
+ * transaction-aware datasets. It is not thread-safe for concurrent execution.
+ */
+public class TransactionContext {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionContext.class);
+
+  private final Collection<TransactionAware> txAwares;
+  private final TransactionSystemClient txClient;
+
+  private Transaction currentTx;
+
+  public TransactionContext(TransactionSystemClient txClient, 
TransactionAware... txAwares) {
+    this(txClient, ImmutableList.copyOf(txAwares));
+  }
+
+  public TransactionContext(TransactionSystemClient txClient, 
Iterable<TransactionAware> txAwares) {
+    // Use a set to avoid adding the same TransactionAware twice and to make 
removal faster.
+    // Use a linked hash set so that insertion order is preserved (same 
behavior as when it was using a List).
+    this.txAwares = Sets.newLinkedHashSet(txAwares);
+    this.txClient = txClient;
+  }
+
+  /**
+   * Adds a new transaction-aware to participate in the transaction.
+   * @param txAware the new transaction-aware
+   */
+  public boolean addTransactionAware(TransactionAware txAware) {
+    // If the txAware is newly added, call startTx as well if there is an 
active transaction
+    boolean added = txAwares.add(txAware);
+    if (added && currentTx != null) {
+      txAware.startTx(currentTx);
+    }
+    return added;
+  }
+
+  /**
+   * Removes a {@link TransactionAware} and withdraws from participation in 
the transaction.
+   * Withdrawal is only allowed if there is no active transaction.
+   *
+   * @param txAware the {@link TransactionAware} to be removed
+   * @return true if the given {@link TransactionAware} is removed; false 
otherwise.
+   * @throws IllegalStateException if there is an active transaction going on 
with this TransactionContext.
+   */
+  public boolean removeTransactionAware(TransactionAware txAware) {
+    Preconditions.checkState(currentTx == null, "Cannot remove 
TransactionAware while there is an active transaction.");
+    return txAwares.remove(txAware);
+  }
+
+  /**
+   * Starts a new transaction.  Calling this will initiate a new transaction 
using the {@link TransactionSystemClient},
+   * and pass the returned transaction to {@link 
TransactionAware#startTx(Transaction)} for each registered
+   * TransactionAware.  If an exception is encountered, the transaction will 
be aborted and a
+   * {@code TransactionFailureException} wrapping the root cause will be 
thrown.
+   *
+   * @throws TransactionFailureException if an exception occurs starting the 
transaction with any registered
+   *     TransactionAware
+   */
+  public void start() throws TransactionFailureException {
+    currentTx = txClient.startShort();
+    for (TransactionAware txAware : txAwares) {
+      try {
+        txAware.startTx(currentTx);
+      } catch (Throwable e) {
+        String message = String.format("Unable to start transaction-aware '%s' 
for transaction %d. ",
+                                       txAware.getTransactionAwareName(), 
currentTx.getTransactionId());
+        LOG.warn(message, e);
+        txClient.abort(currentTx);
+        throw new TransactionFailureException(message, e);
+      }
+    }
+  }
+
+  /**
+   * Commits the current transaction.  This will: check for any conflicts, 
based on the change set aggregated from
+   * all registered {@link TransactionAware} instances; flush any pending 
writes from the {@code TransactionAware}s;
+   * commit the current transaction with the {@link TransactionSystemClient}; 
and clear the current transaction state.
+   *
+   * @throws TransactionConflictException if a conflict is detected with a 
recently committed transaction
+   * @throws TransactionFailureException if an error occurs while committing
+   */
+  public void finish() throws TransactionFailureException {
+    Preconditions.checkState(currentTx != null, "Cannot finish tx that has not 
been started");
+    // each of these steps will abort and rollback the tx in case if errors, 
and throw an exception
+    checkForConflicts();
+    persist();
+    commit();
+    postCommit();
+    currentTx = null;
+  }
+
+  /**
+   * Aborts the given transaction, and rolls back all data set changes. If 
rollback fails,
+   * the transaction is invalidated. If an exception is caught during 
rollback, the exception
+   * is rethrown wrapped in a TransactionFailureException, after all remaining 
TransactionAwares have
+   * completed rollback.
+   *
+   * @throws TransactionFailureException for any exception that is encountered.
+   */
+  public void abort() throws TransactionFailureException {
+    abort(null);
+  }
+
+  /**
+   * Checkpoints the current transaction by flushing any pending writes for 
the registered {@link TransactionAware}
+   * instances, and obtaining a new current write pointer for the transaction. 
 By performing a checkpoint,
+   * the client can ensure that all previous writes were flushed and are 
visible.  By default, the current write
+   * pointer for the transaction is also visible.  The current write pointer 
can be excluded from read
+   * operations by calling {@link 
Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility 
level set
+   * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the 
{@link Transaction} instance created
+   * by the checkpoint call, which can be retrieved by calling {@link 
#getCurrentTransaction()}.
+   *
+   * After the checkpoint operation is performed, the updated
+   * {@link Transaction} instance will be passed to {@link 
TransactionAware#startTx(Transaction)} for each
+   * registered {@code TransactionAware} instance.
+   *
+   * @throws TransactionFailureException if an error occurs while performing 
the checkpoint
+   */
+  public void checkpoint() throws TransactionFailureException {
+    Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has 
not been started");
+    persist();
+    try {
+      currentTx = txClient.checkpoint(currentTx);
+      // update the current transaction with all TransactionAwares
+      for (TransactionAware txAware : txAwares) {
+        txAware.updateTx(currentTx);
+      }
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", 
currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from checkpoint for 
transaction %d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+  }
+
+  /**
+   * Returns the current transaction or null if no transaction is currently in 
progress.
+   */
+  @Nullable
+  public Transaction getCurrentTransaction() {
+    return currentTx;
+  }
+
+  // CHECKSTYLE IGNORE "@throws" FOR 11 LINES
+  /**
+   * Aborts the given transaction, and rolls back all data set changes. If 
rollback fails,
+   * the transaction is invalidated. If an exception is caught during 
rollback, the exception
+   * is rethrown wrapped into a TransactionFailureException, after all 
remaining TransactionAwares have
+   * completed rollback. If an existing exception is passed in, that exception 
is thrown in either
+   * case, whether the rollback is successful or not. In other words, this 
method always throws the
+   * first exception that it encounters.
+   * @param cause the original exception that caused the abort
+   * @throws TransactionFailureException for any exception that is encountered.
+   */
+  public void abort(TransactionFailureException cause) throws 
TransactionFailureException {
+    if (currentTx == null) {
+      // might be called by some generic exception handler even though already 
aborted/finished - we allow that
+      return;
+    }
+    try {
+      boolean success = true;
+      for (TransactionAware txAware : txAwares) {
+        try {
+          if (!txAware.rollbackTx()) {
+            success = false;
+          }
+        } catch (Throwable e) {
+          String message = String.format("Unable to roll back changes in 
transaction-aware '%s' for transaction %d. ",
+                                         txAware.getTransactionAwareName(), 
currentTx.getTransactionId());
+          LOG.warn(message, e);
+          if (cause == null) {
+            cause = new TransactionFailureException(message, e);
+          }
+          success = false;
+        }
+      }
+      if (success) {
+        txClient.abort(currentTx);
+      } else {
+        txClient.invalidate(currentTx.getTransactionId());
+      }
+      if (cause != null) {
+        throw cause;
+      }
+    } finally {
+      currentTx = null;
+    }
+  }
+
+  private void checkForConflicts() throws TransactionFailureException {
+    Collection<byte[]> changes = Lists.newArrayList();
+    for (TransactionAware txAware : txAwares) {
+      try {
+        changes.addAll(txAware.getTxChanges());
+      } catch (Throwable e) {
+        String message = String.format("Unable to retrieve changes from 
transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), 
currentTx.getTransactionId());
+        LOG.warn(message, e);
+        abort(new TransactionFailureException(message, e));
+        // abort will throw that exception
+      }
+    }
+
+    boolean canCommit = false;
+    try {
+      canCommit = txClient.canCommit(currentTx, changes);
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", 
currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from canCommit for transaction 
%d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+    if (!canCommit) {
+      String message = String.format("Conflict detected for transaction %d.", 
currentTx.getTransactionId());
+      abort(new TransactionConflictException(message));
+      // abort will throw
+    }
+  }
+
+  private void persist() throws TransactionFailureException {
+    for (TransactionAware txAware : txAwares) {
+      boolean success;
+      Throwable cause = null;
+      try {
+        success = txAware.commitTx();
+      } catch (Throwable e) {
+        success = false;
+        cause = e;
+      }
+      if (!success) {
+        String message = String.format("Unable to persist changes of 
transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), 
currentTx.getTransactionId());
+        if (cause == null) {
+          LOG.warn(message);
+        } else {
+          LOG.warn(message, cause);
+        }
+        abort(new TransactionFailureException(message, cause));
+        // abort will throw that exception
+      }
+    }
+  }
+
+  private void commit() throws TransactionFailureException {
+    boolean commitSuccess = false;
+    try {
+      commitSuccess = txClient.commit(currentTx);
+    } catch (TransactionNotInProgressException e) {
+      String message = String.format("Transaction %d is not in progress.", 
currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    } catch (Throwable e) {
+      String message = String.format("Exception from commit for transaction 
%d.", currentTx.getTransactionId());
+      LOG.warn(message, e);
+      abort(new TransactionFailureException(message, e));
+      // abort will throw that exception
+    }
+    if (!commitSuccess) {
+      String message = String.format("Conflict detected for transaction %d.", 
currentTx.getTransactionId());
+      abort(new TransactionConflictException(message));
+      // abort will throw
+    }
+  }
+
+  private void postCommit() throws TransactionFailureException {
+    TransactionFailureException cause = null;
+    for (TransactionAware txAware : txAwares) {
+      try {
+        txAware.postTxCommit();
+      } catch (Throwable e) {
+        String message = String.format("Unable to perform post-commit in 
transaction-aware '%s' for transaction %d. ",
+                                       txAware.getTransactionAwareName(), 
currentTx.getTransactionId());
+        LOG.warn(message, e);
+        cause = new TransactionFailureException(message, e);
+      }
+    }
+    if (cause != null) {
+      throw cause;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
 
b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
new file mode 100644
index 0000000..3af9596
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Throw when taking a snapshot fails.
+ */
+public class TransactionCouldNotTakeSnapshotException extends Exception {
+  public TransactionCouldNotTakeSnapshotException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
new file mode 100644
index 0000000..ed5029e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Utility that wraps the execution of a function into the context of a 
transaction.
+ */
+// todo: implementations should throw different from 
TransactionFailureException in case of user code error?
+// todo: accept only Callable? Executors util has a way to convert everything 
to Callable...
+public interface TransactionExecutor {
+
+  /**
+   * A function is a class with a single method that takes an argument and 
returns a result.
+   * @param <I> the type of the argument
+   * @param <O> the type of the result
+   */
+  public interface Function<I, O> {
+    O apply(I input) throws Exception;
+  }
+
+  /**
+   * A procedure is a class with a single void method that takes an argument.
+   * @param <I> the type of the argument
+   */
+  public interface Procedure<I> {
+    void apply(I input) throws Exception;
+  }
+
+  /**
+   * A subroutine is a class with a single void method without arguments.
+   */
+  public interface Subroutine {
+    void apply() throws Exception;
+  }
+
+  // Due to a bug in checkstyle, it would emit false positives here of the form
+  // "Unable to get class information for @throws tag '<exn>' (...)".
+  // This comment disables that check up to the corresponding ON comments below
+
+  // CHECKSTYLE OFF: @throws
+
+  /**
+   * Execute a function under transactional semantics. A transaction is 
started  and all datasets
+   * are initialized with the transaction. Then the passed function is 
executed, the transaction
+   * is committed, and the function return value is returned as the return 
value of this method.
+   * If any exception is caught, the transaction is aborted and the original 
exception is rethrown,
+   * wrapped into a TransactionFailureException. If the transaction fails due 
to a write conflict,
+   * a TransactionConflictException is thrown.
+   * @param function the function to execute
+   * @param input the input parameter for the function
+   * @param <I> the input type of the function
+   * @param <O> the result type of the function
+   * @return the function's return value
+   * @throws TransactionConflictException if there is a write conflict with 
another transaction.
+   * @throws TransactionFailureException if any exception is caught, be it 
from the function or from the datasets.
+   */
+  <I, O> O execute(Function<I, O> function, I input) throws 
TransactionFailureException, InterruptedException;
+
+  // CHECKSTYLE ON
+
+  /**
+   * Like {@link #execute(Function, Object)} but without a return value.
+   */
+  <I> void execute(Procedure<I> procedure, I input) throws 
TransactionFailureException, InterruptedException;
+
+  /**
+   * Like {@link #execute(Function, Object)} but the callable has no argument.
+   */
+  <O> O execute(Callable<O> callable) throws TransactionFailureException, 
InterruptedException;
+
+  /**
+   * Like {@link #execute(Function, Object)} but without argument or return 
value.
+   */
+  void execute(Subroutine subroutine) throws TransactionFailureException, 
InterruptedException;
+
+  /**
+   * Same as {@link #execute(Function, Object)} but
+   * suppresses exception with {@link 
com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <I, O> O executeUnchecked(Function<I, O> function, I input);
+
+  /**
+   * Same as {@link #execute(Procedure, Object)} but
+   * suppresses exception with {@link 
com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <I> void executeUnchecked(Procedure<I> procedure, I input);
+
+  /**
+   * Same as {@link #execute(Callable)} but
+   * suppresses exception with {@link 
com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  <O> O executeUnchecked(Callable<O> callable);
+
+  /**
+   * Same as {@link #execute(Subroutine)} but
+   * suppresses exception with {@link 
com.google.common.base.Throwables#propagate(Throwable)}
+   */
+  void executeUnchecked(Subroutine subroutine);
+
+  /**
+   * Same as {@link #execute(Function, Object)} but executes asynchronously
+   */
+  <I, O> ListenableFuture<O> submit(Function<I, O> function, I input);
+
+  /**
+   * Same as {@link #execute(Procedure, Object)} but executes asynchronously
+   */
+  <I> ListenableFuture<?> submit(Procedure<I> procedure, I input);
+
+  /**
+   * Same as {@link #execute(Callable)} but executes asynchronously
+   */
+  <O> ListenableFuture<O> submit(Callable<O> callable);
+
+  /**
+   * Same as {@link #execute(Subroutine)} but executes asynchronously
+   */
+  ListenableFuture<?> submit(Subroutine subroutine);
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java 
b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
new file mode 100644
index 0000000..afe0f33
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * A factory for transaction executors.
+ */
+public interface TransactionExecutorFactory {
+
+  TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares);
+
+}

Reply via email to