http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 
b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
new file mode 100644
index 0000000..f1d62b8
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.ConfigurationFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Periodically refreshes transaction state from the latest stored snapshot.  
This is implemented as a singleton
+ * to allow a single cache to be shared by all regions on a regionserver.
+ */
+public class TransactionStateCache extends AbstractIdleService implements 
Configurable {
+  private static final Log LOG = 
LogFactory.getLog(TransactionStateCache.class);
+
+  // how frequently we should wake to check for changes (in seconds)
+  private static final long CHECK_FREQUENCY = 15;
+
+  private Configuration hConf;
+
+  private TransactionStateStorage storage;
+  private volatile TransactionVisibilityState latestState;
+
+  private Thread refreshService;
+  private long lastRefresh;
+  // snapshot refresh frequency in milliseconds
+  private long snapshotRefreshFrequency;
+  private boolean initialized;
+
+  public TransactionStateCache() {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hConf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.hConf = conf;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    refreshState();
+    startRefreshService();
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    this.refreshService.interrupt();
+    this.storage.stop();
+  }
+
+  /**
+   * Try to initialize the Configuration and TransactionStateStorage 
instances.  Obtaining the Configuration may
+   * fail until ReactorServiceMain has been started.
+   */
+  private void tryInit() {
+    try {
+      Configuration conf = getSnapshotConfiguration();
+      if (conf != null) {
+        // Since this is only used for background loading of transaction 
snapshots, we use the no-op metrics collector,
+        // as there are no relevant metrics to report
+        this.storage = new HDFSTransactionStateStorage(conf, new 
SnapshotCodecProvider(conf),
+                                                       new 
TxMetricsCollector());
+        this.storage.startAndWait();
+        this.snapshotRefreshFrequency = 
conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
+                                                     
TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000;
+        this.initialized = true;
+      } else {
+        LOG.info("Could not load configuration");
+      }
+    } catch (Exception e) {
+      LOG.info("Failed to initialize TransactionStateCache due to: " + 
e.getMessage());
+    }
+  }
+
+  protected Configuration getSnapshotConfiguration() throws IOException {
+    Configuration conf = new ConfigurationFactory().get(hConf);
+    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+    return conf;
+  }
+
+  private void reset() {
+    this.storage.stop();
+    this.lastRefresh = 0;
+    this.initialized = false;
+  }
+
+  private void startRefreshService() {
+    this.refreshService = new Thread("tx-state-refresh") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          if (latestState == null || System.currentTimeMillis() > (lastRefresh 
+ snapshotRefreshFrequency)) {
+            try {
+              refreshState();
+            } catch (IOException ioe) {
+              LOG.info("Error refreshing transaction state cache: " + 
ioe.getMessage());
+            }
+          }
+          try {
+            TimeUnit.SECONDS.sleep(CHECK_FREQUENCY);
+          } catch (InterruptedException ie) {
+            // reset status
+            interrupt();
+            break;
+          }
+        }
+        LOG.info("Exiting thread " + getName());
+      }
+    };
+    this.refreshService.setDaemon(true);
+    this.refreshService.start();
+  }
+
+  private void refreshState() throws IOException {
+    if (!initialized) {
+      tryInit();
+    }
+
+    // only continue if initialization was successful
+    if (initialized) {
+      long now = System.currentTimeMillis();
+      TransactionVisibilityState currentState = 
storage.getLatestTransactionVisibilityState();
+      if (currentState != null) {
+        if (currentState.getTimestamp() < (now - 2 * 
snapshotRefreshFrequency)) {
+          LOG.info("Current snapshot is old, will force a refresh on next 
run.");
+          reset();
+        } else {
+          latestState = currentState;
+          LOG.info("Transaction state reloaded with snapshot from " + 
latestState.getTimestamp());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Latest transaction snapshot: " + 
latestState.toString());
+          }
+          lastRefresh = now;
+        }
+      } else {
+        LOG.info("No transaction state found.");
+      }
+    }
+  }
+
+  public TransactionVisibilityState getLatestState() {
+    return latestState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
 
b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
new file mode 100644
index 0000000..d19da36
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Supplies instances of {@link TransactionStateCache} implementations.
+ */
+public class TransactionStateCacheSupplier implements 
Supplier<TransactionStateCache> {
+  protected static volatile TransactionStateCache instance;
+  protected static Object lock = new Object();
+
+  protected final Configuration conf;
+
+  public TransactionStateCacheSupplier(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Returns a singleton instance of the transaction state cache, performing 
lazy initialization if necessary.
+   * @return A shared instance of the transaction state cache.
+   */
+  @Override
+  public TransactionStateCache get() {
+    if (instance == null) {
+      synchronized (lock) {
+        if (instance == null) {
+          instance = new TransactionStateCache();
+          instance.setConf(conf);
+          instance.start();
+        }
+      }
+    }
+    return instance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java 
b/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java
new file mode 100644
index 0000000..c554e37
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains HBase coprocessor implementations for the transaction 
system.
+ */
+package org.apache.tephra.coprocessor;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java
new file mode 100644
index 0000000..3abdafa
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An abstract tx client provider that implements common functionality.
+ */
+public abstract class AbstractClientProvider implements ThriftClientProvider {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractClientProvider.class);
+
+  // Discovery service. If null, no service discovery.
+  private final DiscoveryServiceClient discoveryServiceClient;
+  protected final AtomicBoolean initialized = new AtomicBoolean(false);
+
+  // the configuration
+  final Configuration configuration;
+
+  // the endpoint strategy for service discovery.
+  EndpointStrategy endpointStrategy;
+
+  protected AbstractClientProvider(Configuration configuration, 
DiscoveryServiceClient discoveryServiceClient) {
+    this.configuration = configuration;
+    this.discoveryServiceClient = discoveryServiceClient;
+  }
+
+  public void initialize() throws TException {
+    if (initialized.compareAndSet(false, true)) {
+      this.initDiscovery();
+    }
+  }
+
+  /**
+   * Initialize the service discovery client, we will reuse that
+   * every time we need to create a new client.
+   */
+  private void initDiscovery() {
+    if (discoveryServiceClient == null) {
+      LOG.info("No DiscoveryServiceClient provided. Skipping service 
discovery.");
+      return;
+    }
+
+    endpointStrategy = new TimeLimitEndpointStrategy(
+      new RandomEndpointStrategy(
+        discoveryServiceClient.discover(
+          
configuration.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
+                            
TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME))),
+      2, TimeUnit.SECONDS);
+  }
+
+  protected TransactionServiceThriftClient newClient() throws TException {
+    return newClient(-1);
+  }
+
+  protected TransactionServiceThriftClient newClient(int timeout) throws 
TException {
+    initialize();
+    String address;
+    int port;
+
+    if (endpointStrategy == null) {
+      // if there is no discovery service, try to read host and port directly
+      // from the configuration
+      LOG.info("Reading address and port from configuration.");
+      address = configuration.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS,
+                                  
TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
+      port = configuration.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT,
+                                  
TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
+      LOG.info("Service assumed at " + address + ":" + port);
+    } else {
+      Discoverable endpoint = endpointStrategy.pick();
+      if (endpoint == null) {
+        LOG.error("Unable to discover tx service.");
+        throw new TException("Unable to discover tx service.");
+      }
+      address = endpoint.getSocketAddress().getHostName();
+      port = endpoint.getSocketAddress().getPort();
+      LOG.info("Service discovered at " + address + ":" + port);
+    }
+
+    // now we have an address and port, try to connect a client
+    if (timeout < 0) {
+      timeout = 
configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT,
+          TxConstants.Service.DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS);
+    }
+    LOG.info("Attempting to connect to tx service at " +
+               address + ":" + port + " with timeout " + timeout + " ms.");
+    // thrift transport layer
+    TTransport transport =
+        new TFramedTransport(new TSocket(address, port, timeout));
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      LOG.error("Unable to connect to tx service: " + e.getMessage());
+      throw e;
+    }
+    // and create a thrift client
+    TransactionServiceThriftClient newClient = new 
TransactionServiceThriftClient(transport);
+
+    LOG.info("Connected to tx service at " +
+               address + ":" + port);
+    return newClient;
+  }
+
+  /**
+   * This class helps picking up an endpoint from a list of Discoverable.
+   */
+  public interface EndpointStrategy {
+
+    /**
+     * Picks a {@link Discoverable} using its strategy.
+     * @return A {@link Discoverable} based on the stragegy or {@code null} if 
no endpoint can be found.
+     */
+    Discoverable pick();
+  }
+
+  /**
+   * An {@link EndpointStrategy} that make sure it picks an endpoint within 
the given
+   * timeout limit.
+   */
+  public final class TimeLimitEndpointStrategy implements EndpointStrategy {
+
+    private final EndpointStrategy delegate;
+    private final long timeout;
+    private final TimeUnit timeoutUnit;
+
+    public TimeLimitEndpointStrategy(EndpointStrategy delegate, long timeout, 
TimeUnit timeoutUnit) {
+      this.delegate = delegate;
+      this.timeout = timeout;
+      this.timeoutUnit = timeoutUnit;
+    }
+
+    @Override
+    public Discoverable pick() {
+      Discoverable pick = delegate.pick();
+      try {
+        long count = 0;
+        while (pick == null && count++ < timeout) {
+          timeoutUnit.sleep(1);
+          pick = delegate.pick();
+        }
+      } catch (InterruptedException e) {
+        // Simply propagate the interrupt.
+        Thread.currentThread().interrupt();
+      }
+      return pick;
+    }
+  }
+
+  /**
+   * Randomly picks endpoint from the list of available endpoints.
+   */
+  public final class RandomEndpointStrategy implements EndpointStrategy {
+
+    private final Iterable<Discoverable> endpoints;
+
+    /**
+     * Constructs a random endpoint strategy.
+     * @param endpoints Endpoints for the strategy to use. Note that this 
strategy will
+     *                  invoke {@link Iterable#iterator()} and traverse 
through it on
+     *                  every call to the {@link #pick()} method. One could 
leverage this
+     *                  behavior with the live {@link Iterable} as provided by
+     *                  {@link 
org.apache.twill.discovery.DiscoveryServiceClient#discover(String)} method.
+     */
+    public RandomEndpointStrategy(Iterable<Discoverable> endpoints) {
+      this.endpoints = endpoints;
+    }
+
+    @Override
+    public Discoverable pick() {
+      // Reservoir sampling
+      Discoverable result = null;
+      Iterator<Discoverable> itor = endpoints.iterator();
+      Random random = new Random();
+      int count = 0;
+      while (itor.hasNext()) {
+        Discoverable next = itor.next();
+        if (random.nextInt(++count) == 0) {
+          result = next;
+        }
+      }
+      return result;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java
new file mode 100644
index 0000000..ee73ef0
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+/**
+ * An {@link AutoCloseable} to automatically return the thrift client to the 
ThriftClientProvider.
+ */
+public class CloseableThriftClient implements AutoCloseable {
+
+  private final ThriftClientProvider provider;
+  private final TransactionServiceThriftClient thriftClient;
+
+  public CloseableThriftClient(ThriftClientProvider provider, 
TransactionServiceThriftClient thriftClient) {
+    this.provider = provider;
+    this.thriftClient = thriftClient;
+  }
+
+  public TransactionServiceThriftClient getThriftClient() {
+    return thriftClient;
+  }
+
+  @Override
+  public void close() {
+    // in any case, the client must be returned to the pool. The pool is
+    // responsible for discarding the client if it is in a bad state.
+    provider.returnClient(thriftClient);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java 
b/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java
new file mode 100644
index 0000000..8ba3218
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An Elastic Pool is an object pool that can dynamically grow.
+ * Normally, an element is obtained by a client and then returned to the pool
+ * after use. However, if the element gets into a bad state, the element can
+ * be discarded, based upon the recycle() method returning false. This will
+ * cause the element to be removed from the pool, and for a subsequent request,
+ * a new element can be created on the fly to replace the discarded one.
+ *
+ * The pool starts with zero (active) elements. Every time a client attempts
+ * to obtain an element, an element from the pool is returned if available.
+ * Otherwise, if the number of active elements is less than the pool's limit,
+ * a new element is created (using abstract method create(), this must be
+ * overridden by all implementations), and the number of active elements is
+ * increased by one. If the limit is reached, then obtain() blocks until
+ * either an element is returned to the pool or, if the obtain method with 
timeout
+ * parameters is used, a timeout occurs.
+ *
+ * Every time an element is returned to the pool, it is "recycled" to restore 
its
+ * fresh state for the next use or destroyed, depending on its state.
+ *
+ * @param <T> the type of the elements
+ * @param <E> the type of exception thrown by create()
+ */
+public abstract class ElasticPool<T, E extends Exception> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ElasticPool.class);
+
+  /**
+   * A method to create a new element. Will be called every time the pool
+   * of available elements is empty but the limit of active elements is
+   * not exceeded.
+   * @return a new element
+   */
+  protected abstract T create() throws E;
+
+  /**
+   * A method to recycle an existing element when it is returned to the pool.
+   * This methods ensures that the element is in a fresh state before it can
+   * be reused by the next agent. If the element is not to be returned to the 
pool,
+   * calling code is responsible for destroying it and returning false.
+   *
+   * @param element the element to recycle
+   * @return true to reuse element, false to remove it from the pool
+   */
+  protected boolean recycle(T element) {
+    // by default, simply return true
+    return true;
+  }
+
+  // holds all currently available elements
+  private final ConcurrentLinkedQueue<T> elements;
+
+  // we keep track of elements via the permits of a semaphore, because there 
can
+  // be elements in a queue, but also elements that are "loaned out" count 
towards
+  // the pool's size limit
+  private final Semaphore semaphore;
+
+  public ElasticPool(int sizeLimit) {
+    elements = new ConcurrentLinkedQueue<>();
+    semaphore = new Semaphore(sizeLimit, true);
+  }
+
+  /**
+   * Get a element from the pool. If there is an available element in
+   * the pool, it will be returned. Otherwise, if the number of active
+   * elements does not exceed the limit, a new element is created with
+   * create() and returned. Otherwise, blocks until an element is either
+   * released and returned to the pool, or an element is discarded,
+   * allowing for the creation of a new element.
+   *
+   * @return an element
+   */
+  public T obtain() throws E, InterruptedException {
+    semaphore.acquire();
+    return getOrCreate();
+  }
+
+  /**
+   * Get a element from the pool. If there is an available element in
+   * the pool, it will be returned. Otherwise, if the number of active
+   * elements does ot exceed the limit, a new element is created with
+   * create() and returned. Otherwise, blocks until an element is either
+   * released and returned to the pool, an element is discarded,
+   * allowing for the creation of a new element, or a timeout occurs.
+   *
+   * @param timeout the timeout for trying to obtain an element
+   * @param unit the timeout unit for trying to obtain an element
+   * @return an element
+   * @throws TimeoutException if a client is not able to be obtained within 
the given timeout
+   */
+  public T obtain(long timeout, TimeUnit unit) throws E, TimeoutException, 
InterruptedException {
+    if (!semaphore.tryAcquire(1, timeout, unit)) {
+      throw new TimeoutException(String.format("Failed to obtain client within 
%d %s.",
+                                               timeout, unit));
+    }
+    return getOrCreate();
+  }
+
+  // gets an element from the queue, or creates one if there is none available 
in the queue.
+  // the semaphore must be acquired before calling this method. The semaphore 
will be released from within
+  // this method if it throws any exception
+  private T getOrCreate() throws E {
+    try {
+      T client = elements.poll();
+      // a client was available, all good. otherwise, create one
+      if (client != null) {
+        return client;
+      }
+      return create();
+    } catch (Exception e) {
+      // if an exception is thrown after acquiring the semaphore, release the
+      // semaphore before propagating the exception
+      semaphore.release();
+      throw e;
+    }
+  }
+
+  /**
+   * Returns an element to the pool of available elements. The element must
+   * have been obtained from this pool through obtain(). The recycle() method
+   * is called before the element is available for obtain().
+   *
+   * @param element the element to be returned
+   */
+  public void release(T element) {
+    try {
+      if (recycle(element)) {
+        elements.add(element);
+      }
+    } finally {
+      semaphore.release();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java
new file mode 100644
index 0000000..9df1441
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.thrift.TException;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is an tx client provider that uses a bounded size pool of connections.
+ */
+public class PooledClientProvider extends AbstractClientProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PooledClientProvider.class);
+
+  // we will use this as a pool of tx clients
+  class TxClientPool extends ElasticPool<TransactionServiceThriftClient, 
TException> {
+    TxClientPool(int sizeLimit) {
+      super(sizeLimit);
+    }
+
+    @Override
+    protected TransactionServiceThriftClient create() throws TException {
+      return newClient();
+    }
+
+    @Override
+    protected boolean recycle(TransactionServiceThriftClient client) {
+      if (!client.isValid()) {
+        client.close();
+        return false;
+      }
+      return true;
+    }
+  }
+
+  // we will use this as a pool of tx clients
+  private volatile TxClientPool clients;
+
+  // the limit for the number of active clients
+  private int maxClients;
+  // timeout, for obtaining a client
+  private long obtainClientTimeoutMs;
+
+  public PooledClientProvider(Configuration conf, DiscoveryServiceClient 
discoveryServiceClient) {
+    super(conf, discoveryServiceClient);
+  }
+
+  private void initializePool() throws TException {
+    // initialize the super class (needed for service discovery)
+    super.initialize();
+
+    // create a (empty) pool of tx clients
+    maxClients = 
configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT,
+                                      
TxConstants.Service.DEFAULT_DATA_TX_CLIENT_COUNT);
+    if (maxClients < 1) {
+      LOG.warn("Configuration of " + 
TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
+                 " is invalid: value is " + maxClients + " but must be at 
least 1. " +
+                 "Using 1 as a fallback. ");
+      maxClients = 1;
+    }
+
+    obtainClientTimeoutMs =
+      
configuration.getLong(TxConstants.Service.CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS,
+                            
TxConstants.Service.DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS);
+    if (obtainClientTimeoutMs < 0) {
+      LOG.warn("Configuration of " + 
TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
+                 " is invalid: value is " + obtainClientTimeoutMs + " but must 
be at least 0. " +
+                 "Using 0 as a fallback. ");
+      obtainClientTimeoutMs = 0;
+    }
+    this.clients = new TxClientPool(maxClients);
+  }
+
+  @Override
+  public CloseableThriftClient getCloseableClient() throws TException, 
TimeoutException, InterruptedException {
+    TransactionServiceThriftClient client = 
getClientPool().obtain(obtainClientTimeoutMs, TimeUnit.MILLISECONDS);
+    return new CloseableThriftClient(this, client);
+  }
+
+  @Override
+  public void returnClient(TransactionServiceThriftClient client) {
+    getClientPool().release(client);
+  }
+
+  @Override
+  public String toString() {
+    return "Elastic pool of size " + this.maxClients +
+      ", with timeout (in milliseconds): " + this.obtainClientTimeoutMs;
+  }
+
+  private TxClientPool getClientPool() {
+    if (clients != null) {
+      return clients;
+    }
+
+    synchronized (this) {
+      if (clients == null) {
+        try {
+          initializePool();
+        } catch (TException e) {
+          LOG.error("Failed to initialize Tx client provider", e);
+          throw Throwables.propagate(e);
+        }
+      }
+    }
+    return clients;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java
new file mode 100644
index 0000000..8655c44
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+
+/**
+ * A retry strategy that makes N attempts and then gives up. This does
+ * not do anything before the re-attempt - extend this class to add a
+ * sleep or similar.
+ */
+public class RetryNTimes extends RetryStrategy {
+
+  int attempts = 0;
+  int limit;
+
+  /**
+   * @param maxAttempts the number of attempts after which to stop
+   */
+  protected RetryNTimes(int maxAttempts) {
+    limit = maxAttempts;
+  }
+
+  @Override
+  boolean failOnce() {
+    ++attempts;
+    return attempts < limit;
+  }
+
+  /**
+   * A retry strategy provider for this strategy.
+   */
+  public static class Provider implements RetryStrategyProvider {
+
+    int nTimes;
+
+    public Provider() {
+      this.nTimes = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_ATTEMPTS;
+    }
+
+    @Override
+    public void configure(Configuration config) {
+      nTimes = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 
nTimes);
+    }
+
+    @Override
+    public RetryStrategy newRetryStrategy() {
+      return new RetryNTimes(nTimes);
+    }
+
+    @Override
+    public String toString() {
+      return nTimes + " attempts without delay";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java
new file mode 100644
index 0000000..6fd6c9f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+/**
+ * A retry strategy is an abstraction over how the remote tx client shuold 
retry operations after connection
+ * failures.
+ */
+public abstract class RetryStrategy {
+
+  /**
+   * Increments the number of failed attempts.
+   * @return whether another attempt should be made
+   */
+  abstract boolean failOnce();
+
+  /**
+   * Should be called before re-attempting. This can, for instance
+   * inject a sleep time between retries. Default implementation is
+   * to do nothing.
+   */
+  void beforeRetry() {
+    // do nothinhg
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java
new file mode 100644
index 0000000..7763ea5
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A retry strategy provider is used by the tx client to get a new retry 
strategy for every call.
+ */
+public interface RetryStrategyProvider {
+
+  /**
+   * Provides a new instance of a retry strategy.
+   * @return a retry strategy
+   */
+  RetryStrategy newRetryStrategy();
+
+  /**
+   * Configure the strategy.
+   * @param config the configuration
+   */
+  void configure(Configuration config);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java
new file mode 100644
index 0000000..ce1130e
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A retry strategy that makes N attempts and then gives up. This does
+ * not do anything before the re-attempt - extend this class to add a
+ * sleep or similar.
+ */
+public class RetryWithBackoff extends RetryStrategy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RetryWithBackoff.class);
+
+  int initialSleep; // initial sleep time
+  int backoffFactor; // factor by which to increase sleep for each retry
+  int maxSleep; // max sleep time. stop retrying when we exceed this
+  int sleep; // current sleep time
+
+  /**
+   * @param initial the initial sleep time (before first retry)
+   * @param backoff the backoff factor by which sleep time is multiplied
+   *                after each retry
+   * @param limit the max sleep time. if sleep time reaches this limit, we
+   *              stop retrying
+   */
+  protected RetryWithBackoff(int initial, int backoff, int limit) {
+    initialSleep = initial;
+    backoffFactor = backoff;
+    maxSleep = limit;
+    sleep = initialSleep;
+  }
+
+  @Override
+  boolean failOnce() {
+    return sleep < maxSleep;
+  }
+
+  @Override
+  void beforeRetry() {
+    LOG.info("Sleeping " + sleep + " ms before retry.");
+    long current = System.currentTimeMillis();
+    long end = current + sleep;
+    while (current < end) {
+      try {
+        Thread.sleep(end - current);
+      } catch (InterruptedException e) {
+        // do nothing
+      }
+      current = System.currentTimeMillis();
+    }
+    sleep = sleep * backoffFactor;
+  }
+
+  /**
+   * A provider for this retry strategy.
+   */
+  public static class Provider implements RetryStrategyProvider {
+
+    int initialSleep; // initial sleep time
+    int backoffFactor; // factor by which to increase sleep for each retry
+    int maxSleep; // max sleep time. stop retrying when we exceed this
+
+    public Provider() {
+      initialSleep = 
TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL;
+      backoffFactor = 
TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR;
+      maxSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT;
+    }
+
+    public void configure(Configuration config) {
+      initialSleep = 
config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_INITIAL, 
initialSleep);
+      backoffFactor = 
config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_FACTOR, 
backoffFactor);
+      maxSleep = 
config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_LIMIT, maxSleep);
+    }
+
+    @Override
+    public RetryStrategy newRetryStrategy() {
+      return new RetryWithBackoff(initialSleep, backoffFactor, maxSleep);
+    }
+
+    @Override
+    public String toString() {
+      return "sleep " + initialSleep + " ms with back off factor " +
+          backoffFactor + " and limit " + maxSleep + " ms";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java
new file mode 100644
index 0000000..d55da5e
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An tx client provider that creates a new connection every time.
+ */
+public class SingleUseClientProvider extends AbstractClientProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SingleUseClientProvider.class);
+
+  public SingleUseClientProvider(Configuration conf, DiscoveryServiceClient 
discoveryServiceClient, int timeout) {
+    super(conf, discoveryServiceClient);
+    this.timeout = timeout;
+  }
+
+  final int timeout;
+
+  @Override
+  public CloseableThriftClient getCloseableClient() throws TException, 
TimeoutException, InterruptedException {
+    try {
+      return new CloseableThriftClient(this, newClient(timeout));
+    } catch (TException e) {
+      LOG.error("Unable to create new tx client: " + e.getMessage());
+      throw e;
+    }
+  }
+
+  @Override
+  public void returnClient(TransactionServiceThriftClient client) {
+    client.close();
+  }
+
+  @Override
+  public String toString() {
+    return "Single-use(timeout = " + timeout + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java
new file mode 100644
index 0000000..dedaffe
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.TException;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An tx client provider that uses thread local to maintain at most one open 
connection per thread.
+ * Note that there can be a connection leak if the threads are recycled.
+ */
+public class ThreadLocalClientProvider extends AbstractClientProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ThreadLocalClientProvider.class);
+
+  ThreadLocal<TransactionServiceThriftClient> clients = new ThreadLocal<>();
+
+  public ThreadLocalClientProvider(Configuration conf, DiscoveryServiceClient 
discoveryServiceClient) {
+    super(conf, discoveryServiceClient);
+  }
+
+  @Override
+  public CloseableThriftClient getCloseableClient() throws TException, 
TimeoutException, InterruptedException {
+    TransactionServiceThriftClient client = this.clients.get();
+    if (client == null) {
+      try {
+        client = this.newClient();
+        clients.set(client);
+      } catch (TException e) {
+        LOG.error("Unable to create new tx client for thread: "
+                    + e.getMessage());
+        throw e;
+      }
+    }
+    return new CloseableThriftClient(this, client);
+  }
+
+  @Override
+  public void returnClient(TransactionServiceThriftClient client) {
+    if (!client.isValid()) {
+      client.close();
+      clients.remove();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "Thread-local";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java
new file mode 100644
index 0000000..6a41912
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import org.apache.thrift.TException;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This interface is used to provide thrift tx service clients:
+ * there is only one (singleton)
+ * tx service per JVM, but many threads may use it concurrently.
+ * However, being a thrift client, it is not thread-safe. In
+ * order to avoid serializing all tx calls by synchronizing
+ * on the tx service client, we employ a pool of clients. But
+ * in different scenarios there are different strategies for
+ * pooling: If there are many short-lived threads, it is wise
+ * to have a shared pool between all threads. But if there are
+ * few long-lived threads, it may be better to have thread-local
+ * client for each thread.
+ *
+ * This interface provides an abstraction of the pooling strategy.
+ */
+public interface ThriftClientProvider {
+
+  /**
+   * Initialize the provider. At this point, it should be verified
+   * that tx service is up and running and getClient() can
+   * create new clients when necessary.
+   */
+  void initialize() throws TException;
+
+  /**
+   * Retrieve an AutoCloseable wrapper around  tx client for exclusive use by 
the
+   * current thread. The client must be closed (returned) to the provider 
after use.
+   * @return an tx client, connected and fully functional
+   */
+  CloseableThriftClient getCloseableClient() throws TException,
+    TimeoutException, InterruptedException;
+
+  /**
+   * Release an tx client back to the provider's pool, if the client is valid.
+   * If the client becomes disfunctional, for instance, due to a socket
+   * exception. The provider must make sure to close the client, and it
+   * must remove the client from its arsenal and be prepared to create
+   * a new client subsequently.
+   *
+   * @param client The client to return
+   */
+  void returnClient(TransactionServiceThriftClient client);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java
new file mode 100644
index 0000000..66e463d
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.primitives.Longs;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.tephra.distributed.thrift.TTransactionType;
+import org.apache.tephra.distributed.thrift.TVisibilityLevel;
+
+/**
+ * Utility methods to convert to thrift and back.
+ */
+public final class TransactionConverterUtils {
+  private static final long[] EMPTY_LONG_ARRAY = {};
+
+  public static TTransaction wrap(Transaction tx) {
+    return new TTransaction(tx.getTransactionId(), tx.getReadPointer(),
+                            Longs.asList(tx.getInvalids()), 
Longs.asList(tx.getInProgress()),
+                            tx.getFirstShortInProgress(), 
getTTransactionType(tx.getType()),
+                            tx.getWritePointer(), 
Longs.asList(tx.getCheckpointWritePointers()),
+                            getTVisibilityLevel(tx.getVisibilityLevel()));
+  }
+
+  public static Transaction unwrap(TTransaction thriftTx) {
+    return new Transaction(thriftTx.getReadPointer(), 
thriftTx.getTransactionId(), thriftTx.getWritePointer(),
+                           thriftTx.getInvalids() == null ? EMPTY_LONG_ARRAY : 
Longs.toArray(thriftTx.getInvalids()),
+                           thriftTx.getInProgress() == null ? EMPTY_LONG_ARRAY 
:
+                               Longs.toArray(thriftTx.getInProgress()),
+                           thriftTx.getFirstShort(), 
getTransactionType(thriftTx.getType()),
+                           thriftTx.getCheckpointWritePointers() == null ? 
EMPTY_LONG_ARRAY :
+                               
Longs.toArray(thriftTx.getCheckpointWritePointers()),
+                           getVisibilityLevel(thriftTx.getVisibilityLevel()));
+  }
+
+  private static TransactionType getTransactionType(TTransactionType tType) {
+    return tType == TTransactionType.SHORT ? TransactionType.SHORT : 
TransactionType.LONG;
+  }
+
+  private static TTransactionType getTTransactionType(TransactionType type) {
+    return type == TransactionType.SHORT ? TTransactionType.SHORT : 
TTransactionType.LONG;
+  }
+
+  private static Transaction.VisibilityLevel 
getVisibilityLevel(TVisibilityLevel tLevel) {
+    // default to SNAPSHOT
+    if (tLevel == null) {
+      return Transaction.VisibilityLevel.SNAPSHOT;
+    }
+
+    switch (tLevel) {
+      case SNAPSHOT:
+        return Transaction.VisibilityLevel.SNAPSHOT;
+      case SNAPSHOT_EXCLUDE_CURRENT:
+        return Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+      case SNAPSHOT_ALL:
+        return Transaction.VisibilityLevel.SNAPSHOT_ALL;
+      default:
+        throw new IllegalArgumentException("Unknown TVisibilityLevel: " + 
tLevel);
+    }
+  }
+
+  private static TVisibilityLevel 
getTVisibilityLevel(Transaction.VisibilityLevel level) {
+    switch (level) {
+      case SNAPSHOT:
+        return TVisibilityLevel.SNAPSHOT;
+      case SNAPSHOT_EXCLUDE_CURRENT:
+        return TVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
+      case SNAPSHOT_ALL:
+        return TVisibilityLevel.SNAPSHOT_ALL;
+      default:
+        throw new IllegalArgumentException("Unknown VisibilityLevel: " + 
level);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
new file mode 100644
index 0000000..6fbf926
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.distributed.thrift.TTransactionServer;
+import org.apache.tephra.inmemory.InMemoryTransactionService;
+import org.apache.tephra.rpc.ThriftRPCServer;
+import org.apache.twill.api.ElectionHandler;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.internal.ServiceListenerAdapter;
+import org.apache.twill.internal.zookeeper.LeaderElection;
+import org.apache.twill.zookeeper.ZKClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ *
+ */
+public final class TransactionService extends InMemoryTransactionService {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionService.class);
+  private LeaderElection leaderElection;
+  private final ZKClient zkClient;
+
+  private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> 
server;
+
+  @Inject
+  public TransactionService(Configuration conf,
+                            ZKClient zkClient,
+                            DiscoveryService discoveryService,
+                            Provider<TransactionManager> txManagerProvider) {
+    super(conf, discoveryService, txManagerProvider);
+    this.zkClient = zkClient;
+  }
+
+  @Override
+  protected InetSocketAddress getAddress() {
+    if (address.equals("0.0.0.0")) {
+      // resolve hostname
+      try {
+        return new InetSocketAddress(InetAddress.getLocalHost().getHostName(), 
server.getBindAddress().getPort());
+      } catch (UnknownHostException x) {
+        LOG.error("Cannot resolve hostname for 0.0.0.0", x);
+      }
+    }
+    return server.getBindAddress();
+  }
+
+  @Override
+  protected void doStart() {
+    leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new 
ElectionHandler() {
+      @Override
+      public void leader() {
+        // if the txManager fails, we should stop the server
+        txManager = txManagerProvider.get();
+        txManager.addListener(new ServiceListenerAdapter() {
+          @Override
+          public void failed(State from, Throwable failure) {
+            LOG.error("Transaction manager aborted, stopping transaction 
service");
+            TransactionService.this.abort(failure);
+          }
+        }, MoreExecutors.sameThreadExecutor());
+
+        server = ThriftRPCServer.builder(TTransactionServer.class)
+          .setHost(address)
+          .setPort(port)
+          .setWorkerThreads(threads)
+          .setMaxReadBufferBytes(maxReadBufferBytes)
+          .setIOThreads(ioThreads)
+          .build(new TransactionServiceThriftHandler(txManager));
+        try {
+          server.startAndWait();
+          doRegister();
+          LOG.info("Transaction Thrift Service started successfully on " + 
getAddress());
+        } catch (Throwable t) {
+          LOG.info("Transaction Thrift Service didn't start on " + 
server.getBindAddress());
+          leaderElection.stop();
+          notifyFailed(t);
+        }
+      }
+
+      @Override
+      public void follower() {
+        // First stop the transaction server as un-registering from discovery 
can block sometimes.
+        // That can lead to multiple transaction servers being active at the 
same time.
+        if (server != null && server.isRunning()) {
+          server.stopAndWait();
+        }
+        undoRegister();
+      }
+    });
+    leaderElection.start();
+
+    notifyStarted();
+  }
+
+  @VisibleForTesting
+  State thriftRPCServerState() {
+    return server.state();
+  }
+
+  @Override
+  protected void doStop() {
+    internalStop();
+    notifyStopped();
+  }
+
+  protected void abort(Throwable cause) {
+    // try to clear leader status and shutdown RPC
+    internalStop();
+    notifyFailed(cause);
+  }
+
+  protected void internalStop() {
+    if (leaderElection != null) {
+      // NOTE: if was a leader this will cause loosing of leadership which in 
callback above will
+      //       de-register service in discovery service and stop the service 
if needed
+      try {
+        Uninterruptibles.getUninterruptibly(leaderElection.stop(), 5, 
TimeUnit.SECONDS);
+      } catch (TimeoutException te) {
+        LOG.warn("Timed out waiting for leader election cancellation to 
complete");
+      } catch (ExecutionException e) {
+        LOG.error("Exception when cancelling leader election.", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
new file mode 100644
index 0000000..ca15e9e
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.thrift.TException;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * A tx service client
+ */
+public class TransactionServiceClient implements TransactionSystemClient {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TransactionServiceClient.class);
+
+  // we will use this to provide every call with an tx client
+  private ThriftClientProvider clientProvider;
+
+  // the retry strategy we will use
+  private final RetryStrategyProvider retryStrategyProvider;
+
+  /**
+   * Utility to be used for basic verification of transaction system 
availability and functioning
+   * @param args arguments list, accepts single option "-v" that makes it to 
print out more details about started tx
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
+      System.out.println("USAGE: TransactionServiceClient [-v]");
+    }
+
+    boolean verbose = false;
+    if (args.length == 1 && "-v".equals(args[0])) {
+      verbose = true;
+    }
+    doMain(verbose, new ConfigurationFactory().get());
+  }
+
+  @VisibleForTesting
+  public static void doMain(boolean verbose, Configuration conf) throws 
Exception {
+    LOG.info("Starting tx server client test.");
+    Injector injector = Guice.createInjector(
+      new ConfigModule(conf),
+      new ZKModule(),
+      new DiscoveryModules().getDistributedModules(),
+      new TransactionModules().getDistributedModules(),
+      new TransactionClientModule()
+    );
+
+    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+
+    try {
+      TransactionServiceClient client = 
injector.getInstance(TransactionServiceClient.class);
+      LOG.info("Starting tx...");
+      Transaction tx = client.startShort();
+      if (verbose) {
+        LOG.info("Started tx details: " + tx.toString());
+      } else {
+        LOG.info("Started tx: " + tx.getTransactionId() +
+                   ", readPointer: " + tx.getReadPointer() +
+                   ", invalids: " + tx.getInvalids().length +
+                   ", inProgress: " + tx.getInProgress().length);
+      }
+      LOG.info("Checking if canCommit tx...");
+      boolean canCommit = client.canCommit(tx, 
Collections.<byte[]>emptyList());
+      LOG.info("canCommit: " + canCommit);
+      if (canCommit) {
+        LOG.info("Committing tx...");
+        boolean committed = client.commit(tx);
+        LOG.info("Committed tx: " + committed);
+        if (!committed) {
+          LOG.info("Aborting tx...");
+          client.abort(tx);
+          LOG.info("Aborted tx...");
+        }
+      } else {
+        LOG.info("Aborting tx...");
+        client.abort(tx);
+        LOG.info("Aborted tx...");
+      }
+    } finally {
+      zkClient.stopAndWait();
+    }
+  }
+
+  /**
+   * Create from a configuration. This will first attempt to find a zookeeper
+   * for service discovery. Otherwise it will look for the port in the
+   * config and use localhost.
+   * @param config a configuration containing the zookeeper properties
+   */
+  @Inject
+  public TransactionServiceClient(Configuration config,
+                                  ThriftClientProvider clientProvider) {
+
+    // initialize the retry logic
+    String retryStrat = config.get(
+        TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY,
+        TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY);
+    if ("backoff".equals(retryStrat)) {
+      this.retryStrategyProvider = new RetryWithBackoff.Provider();
+    } else if ("n-times".equals(retryStrat)) {
+      this.retryStrategyProvider = new RetryNTimes.Provider();
+    } else {
+      String message = "Unknown Retry Strategy '" + retryStrat + "'.";
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+    this.retryStrategyProvider.configure(config);
+    LOG.debug("Retry strategy is " + this.retryStrategyProvider);
+
+    this.clientProvider = clientProvider;
+  }
+
+  /**
+   * This is an abstract class that encapsulates an operation. It provides a
+   * method to attempt the actual operation, and it can throw an operation
+   * exception.
+   * @param <T> The return type of the operation
+   */
+  abstract static class Operation<T> {
+
+    /** the name of the operation. */
+    String name;
+
+    /** constructor with name of operation. */
+    Operation(String name) {
+      this.name = name;
+    }
+
+    /** return the name of the operation. */
+    String getName() {
+      return name;
+    }
+
+    /** execute the operation, given an tx client. */
+    abstract T execute(TransactionServiceThriftClient client)
+        throws Exception;
+  }
+
+  /** see execute(operation, client). */
+  private <T> T execute(Operation<T> operation) throws Exception {
+    return execute(operation, null);
+  }
+
+  /**
+   * This is a generic method implementing the somewhat complex execution
+   * and retry logic for operations, to avoid repetitive code.
+   *
+   * Attempts to execute one operation, by obtaining an tx client from
+   * the client provider and passing the operation to the client. If the
+   * call fails with a Thrift exception, apply the retry strategy. If no
+   * more retries are to be made according to the strategy, call the
+   * operation's error method to obtain a value to return. Note that error()
+   * may also throw an exception. Note also that the retry logic is only
+   * applied for thrift exceptions.
+   *
+   * @param operation The operation to be executed
+   * @param provider An tx client provider. If null, then a client will be
+   *                 obtained using the client provider
+   * @param <T> The return type of the operation
+   * @return the result of the operation, or a value returned by error()
+   */
+  private <T> T execute(Operation<T> operation, ThriftClientProvider provider) 
throws Exception {
+    RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy();
+    while (true) {
+      // did we get a custom client provider or do we use the default?
+      if (provider == null) {
+        provider = this.clientProvider;
+      }
+      // this will throw a TException if it cannot get a client
+      try (CloseableThriftClient closeable = provider.getCloseableClient()) {
+        // note that this can throw exceptions other than TException
+        return operation.execute(closeable.getThriftClient());
+
+      } catch (TException te) {
+        // determine whether we should retry
+        boolean retry = retryStrategy.failOnce();
+        if (!retry) {
+          // retry strategy is exceeded, throw an operation exception
+          String message =
+              "Thrift error for " + operation + ": " + te.getMessage();
+          LOG.error(message);
+          LOG.debug(message, te);
+          throw new Exception(message, te);
+        } else {
+          // call retry strategy before retrying
+          retryStrategy.beforeRetry();
+          String msg = "Retrying " + operation.getName() + " after Thrift 
error: " + te.getMessage();
+          LOG.info(msg);
+          LOG.debug(msg, te);
+        }
+
+      }
+    }
+  }
+
+  @Override
+  public Transaction startLong() {
+    try {
+      return execute(
+        new Operation<Transaction>("startLong") {
+          @Override
+          public Transaction execute(TransactionServiceThriftClient client)
+            throws TException {
+            return client.startLong();
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Transaction startShort() {
+    try {
+      return execute(
+        new Operation<Transaction>("startShort") {
+          @Override
+          public Transaction execute(TransactionServiceThriftClient client)
+            throws TException {
+            return client.startShort();
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Transaction startShort(final int timeout) {
+    try {
+      return execute(
+        new Operation<Transaction>("startShort") {
+          @Override
+          public Transaction execute(TransactionServiceThriftClient client)
+            throws TException {
+            return client.startShort(timeout);
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean canCommit(final Transaction tx, final Collection<byte[]> 
changeIds)
+    throws TransactionNotInProgressException {
+
+    try {
+      return execute(
+        new Operation<Boolean>("canCommit") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws Exception {
+            return client.canCommit(tx, changeIds);
+          }
+        });
+    } catch (TransactionNotInProgressException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean commit(final Transaction tx) throws 
TransactionNotInProgressException {
+    try {
+      return this.execute(
+        new Operation<Boolean>("commit") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws Exception {
+            return client.commit(tx);
+          }
+        });
+    } catch (TransactionNotInProgressException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void abort(final Transaction tx) {
+    try {
+      this.execute(
+        new Operation<Boolean>("abort") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws TException {
+            client.abort(tx);
+            return true;
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean invalidate(final long tx) {
+    try {
+      return this.execute(
+        new Operation<Boolean>("invalidate") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client)
+            throws TException {
+            return client.invalidate(tx);
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public Transaction checkpoint(final Transaction tx) throws 
TransactionNotInProgressException {
+    try {
+      return this.execute(
+          new Operation<Transaction>("checkpoint") {
+            @Override
+            Transaction execute(TransactionServiceThriftClient client) throws 
Exception {
+              return client.checkpoint(tx);
+            }
+          }
+      );
+    } catch (TransactionNotInProgressException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public InputStream getSnapshotInputStream() throws 
TransactionCouldNotTakeSnapshotException {
+    try {
+      return this.execute(
+          new Operation<InputStream>("takeSnapshot") {
+            @Override
+            public InputStream execute(TransactionServiceThriftClient client)
+                throws Exception {
+              return client.getSnapshotStream();
+            }
+          });
+    } catch (TransactionCouldNotTakeSnapshotException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public String status() {
+    try {
+      return this.execute(
+        new Operation<String>("status") {
+          @Override
+          public String execute(TransactionServiceThriftClient client) throws 
Exception {
+            return client.status();
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void resetState() {
+    try {
+      this.execute(
+          new Operation<Boolean>("resetState") {
+            @Override
+            public Boolean execute(TransactionServiceThriftClient client)
+                throws TException {
+              client.resetState();
+              return true;
+            }
+          });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean truncateInvalidTx(final Set<Long> invalidTxIds) {
+    try {
+      return this.execute(
+        new Operation<Boolean>("truncateInvalidTx") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client) throws 
TException {
+            return client.truncateInvalidTx(invalidTxIds);
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public boolean truncateInvalidTxBefore(final long time) throws 
InvalidTruncateTimeException {
+    try {
+      return this.execute(
+        new Operation<Boolean>("truncateInvalidTxBefore") {
+          @Override
+          public Boolean execute(TransactionServiceThriftClient client) throws 
Exception {
+            return client.truncateInvalidTxBefore(time);
+          }
+        });
+    } catch (InvalidTruncateTimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public int getInvalidSize() {
+    try {
+      return this.execute(
+        new Operation<Integer>("getInvalidSize") {
+          @Override
+          public Integer execute(TransactionServiceThriftClient client) throws 
TException {
+            return client.getInvalidSize();
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git 
a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
new file mode 100644
index 0000000..a7e9592
--- /dev/null
+++ 
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.distributed;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
+import 
org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
+import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
+import org.apache.tephra.distributed.thrift.TTransactionServer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class is a wrapper around the thrift tx service client, it takes
+ * Operations, converts them into thrift objects, calls the thrift
+ * client, and converts the results back to data fabric classes.
+ * This class also instruments the thrift calls with metrics.
+ */
+public class TransactionServiceThriftClient {
+  private static final Function<byte[], ByteBuffer> BYTES_WRAPPER = new 
Function<byte[], ByteBuffer>() {
+    @Override
+    public ByteBuffer apply(byte[] input) {
+      return ByteBuffer.wrap(input);
+    }
+  };
+
+  /**
+   * The thrift transport layer. We need this when we close the connection.
+   */
+  TTransport transport;
+
+  /**
+   * The actual thrift client.
+   */
+  TTransactionServer.Client client;
+
+  /**
+   * Whether this client is valid for use.
+   */
+  private final AtomicBoolean isValid = new AtomicBoolean(true);
+
+  /**
+   * Constructor from an existing, connected thrift transport.
+   *
+   * @param transport the thrift transport layer. It must already be connected
+   */
+  public TransactionServiceThriftClient(TTransport transport) {
+    this.transport = transport;
+    // thrift protocol layer, we use binary because so does the service
+    TProtocol protocol = new TBinaryProtocol(transport);
+    // and create a thrift client
+    this.client = new TTransactionServer.Client(protocol);
+  }
+
+  /**
+   * close this client. may be called multiple times
+   */
+  public void close() {
+    if (this.transport.isOpen()) {
+      this.transport.close();
+    }
+  }
+
+  public Transaction startLong() throws TException {
+    try {
+      return TransactionConverterUtils.unwrap(client.startLong());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public Transaction startShort() throws TException {
+    try {
+      return TransactionConverterUtils.unwrap(client.startShort());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public Transaction startShort(int timeout) throws TException {
+    try {
+      return 
TransactionConverterUtils.unwrap(client.startShortTimeout(timeout));
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
+    throws TException, TransactionNotInProgressException {
+    try {
+      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
+                                
ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
+    } catch (TTransactionNotInProgressException e) {
+      throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+
+
+  public boolean commit(Transaction tx) throws TException, 
TransactionNotInProgressException {
+    try {
+      return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue();
+    } catch (TTransactionNotInProgressException e) {
+      throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public void abort(Transaction tx) throws TException {
+    try {
+      client.abortTx(TransactionConverterUtils.wrap(tx));
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public boolean invalidate(long tx) throws TException {
+    try {
+      return client.invalidateTx(tx);
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public Transaction checkpoint(Transaction tx) throws TException {
+    try {
+      return 
TransactionConverterUtils.unwrap(client.checkpoint(TransactionConverterUtils.wrap(tx)));
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public InputStream getSnapshotStream() throws TException, 
TransactionCouldNotTakeSnapshotException {
+    try {
+      ByteBuffer buffer = client.getSnapshot();
+      if (buffer.hasArray()) {
+        return new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + 
buffer.position(), buffer.remaining());
+      }
+
+      // The ByteBuffer is not backed by array. Read the content to a new byte 
array and return an InputStream of that.
+      byte[] snapshot = new byte[buffer.remaining()];
+      buffer.get(snapshot);
+      return new ByteArrayInputStream(snapshot);
+    } catch (TTransactionCouldNotTakeSnapshotException e) {
+      throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public String status() throws TException {
+    try {
+      return client.status();
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public void resetState() throws TException {
+    try {
+        client.resetState();
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public boolean truncateInvalidTx(Set<Long> invalidTxIds) throws TException {
+    try {
+      return client.truncateInvalidTx(invalidTxIds).isValue();
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+  
+  public boolean truncateInvalidTxBefore(long time) throws TException, 
InvalidTruncateTimeException {
+    try {
+      return client.truncateInvalidTxBefore(time).isValue();
+    } catch (TInvalidTruncateTimeException e) {
+      throw new InvalidTruncateTimeException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+  
+  public int getInvalidSize() throws TException {
+    try {
+      return client.invalidTxSize();
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public boolean isValid() {
+    return isValid.get();
+  }
+}


Reply via email to