http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java new file mode 100644 index 0000000..f1d62b8 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.coprocessor; + +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.ConfigurationFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Periodically refreshes transaction state from the latest stored snapshot. This is implemented as a singleton + * to allow a single cache to be shared by all regions on a regionserver. + */ +public class TransactionStateCache extends AbstractIdleService implements Configurable { + private static final Log LOG = LogFactory.getLog(TransactionStateCache.class); + + // how frequently we should wake to check for changes (in seconds) + private static final long CHECK_FREQUENCY = 15; + + private Configuration hConf; + + private TransactionStateStorage storage; + private volatile TransactionVisibilityState latestState; + + private Thread refreshService; + private long lastRefresh; + // snapshot refresh frequency in milliseconds + private long snapshotRefreshFrequency; + private boolean initialized; + + public TransactionStateCache() { + } + + @Override + public Configuration getConf() { + return hConf; + } + + @Override + public void setConf(Configuration conf) { + this.hConf = conf; + } + + @Override + protected void startUp() throws Exception { + refreshState(); + startRefreshService(); + } + + @Override + protected void shutDown() throws Exception { + this.refreshService.interrupt(); + this.storage.stop(); + } + + /** + * Try to initialize the Configuration and TransactionStateStorage instances. Obtaining the Configuration may + * fail until ReactorServiceMain has been started. + */ + private void tryInit() { + try { + Configuration conf = getSnapshotConfiguration(); + if (conf != null) { + // Since this is only used for background loading of transaction snapshots, we use the no-op metrics collector, + // as there are no relevant metrics to report + this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), + new TxMetricsCollector()); + this.storage.startAndWait(); + this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, + TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000; + this.initialized = true; + } else { + LOG.info("Could not load configuration"); + } + } catch (Exception e) { + LOG.info("Failed to initialize TransactionStateCache due to: " + e.getMessage()); + } + } + + protected Configuration getSnapshotConfiguration() throws IOException { + Configuration conf = new ConfigurationFactory().get(hConf); + conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + return conf; + } + + private void reset() { + this.storage.stop(); + this.lastRefresh = 0; + this.initialized = false; + } + + private void startRefreshService() { + this.refreshService = new Thread("tx-state-refresh") { + @Override + public void run() { + while (!isInterrupted()) { + if (latestState == null || System.currentTimeMillis() > (lastRefresh + snapshotRefreshFrequency)) { + try { + refreshState(); + } catch (IOException ioe) { + LOG.info("Error refreshing transaction state cache: " + ioe.getMessage()); + } + } + try { + TimeUnit.SECONDS.sleep(CHECK_FREQUENCY); + } catch (InterruptedException ie) { + // reset status + interrupt(); + break; + } + } + LOG.info("Exiting thread " + getName()); + } + }; + this.refreshService.setDaemon(true); + this.refreshService.start(); + } + + private void refreshState() throws IOException { + if (!initialized) { + tryInit(); + } + + // only continue if initialization was successful + if (initialized) { + long now = System.currentTimeMillis(); + TransactionVisibilityState currentState = storage.getLatestTransactionVisibilityState(); + if (currentState != null) { + if (currentState.getTimestamp() < (now - 2 * snapshotRefreshFrequency)) { + LOG.info("Current snapshot is old, will force a refresh on next run."); + reset(); + } else { + latestState = currentState; + LOG.info("Transaction state reloaded with snapshot from " + latestState.getTimestamp()); + if (LOG.isDebugEnabled()) { + LOG.debug("Latest transaction snapshot: " + latestState.toString()); + } + lastRefresh = now; + } + } else { + LOG.info("No transaction state found."); + } + } + } + + public TransactionVisibilityState getLatestState() { + return latestState; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java new file mode 100644 index 0000000..d19da36 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.coprocessor; + +import com.google.common.base.Supplier; +import org.apache.hadoop.conf.Configuration; + +/** + * Supplies instances of {@link TransactionStateCache} implementations. + */ +public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> { + protected static volatile TransactionStateCache instance; + protected static Object lock = new Object(); + + protected final Configuration conf; + + public TransactionStateCacheSupplier(Configuration conf) { + this.conf = conf; + } + + /** + * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary. + * @return A shared instance of the transaction state cache. + */ + @Override + public TransactionStateCache get() { + if (instance == null) { + synchronized (lock) { + if (instance == null) { + instance = new TransactionStateCache(); + instance.setConf(conf); + instance.start(); + } + } + } + return instance; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java new file mode 100644 index 0000000..c554e37 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains HBase coprocessor implementations for the transaction system. + */ +package org.apache.tephra.coprocessor; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java new file mode 100644 index 0000000..3abdafa --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/AbstractClientProvider.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.twill.discovery.Discoverable; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An abstract tx client provider that implements common functionality. + */ +public abstract class AbstractClientProvider implements ThriftClientProvider { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientProvider.class); + + // Discovery service. If null, no service discovery. + private final DiscoveryServiceClient discoveryServiceClient; + protected final AtomicBoolean initialized = new AtomicBoolean(false); + + // the configuration + final Configuration configuration; + + // the endpoint strategy for service discovery. + EndpointStrategy endpointStrategy; + + protected AbstractClientProvider(Configuration configuration, DiscoveryServiceClient discoveryServiceClient) { + this.configuration = configuration; + this.discoveryServiceClient = discoveryServiceClient; + } + + public void initialize() throws TException { + if (initialized.compareAndSet(false, true)) { + this.initDiscovery(); + } + } + + /** + * Initialize the service discovery client, we will reuse that + * every time we need to create a new client. + */ + private void initDiscovery() { + if (discoveryServiceClient == null) { + LOG.info("No DiscoveryServiceClient provided. Skipping service discovery."); + return; + } + + endpointStrategy = new TimeLimitEndpointStrategy( + new RandomEndpointStrategy( + discoveryServiceClient.discover( + configuration.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, + TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME))), + 2, TimeUnit.SECONDS); + } + + protected TransactionServiceThriftClient newClient() throws TException { + return newClient(-1); + } + + protected TransactionServiceThriftClient newClient(int timeout) throws TException { + initialize(); + String address; + int port; + + if (endpointStrategy == null) { + // if there is no discovery service, try to read host and port directly + // from the configuration + LOG.info("Reading address and port from configuration."); + address = configuration.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, + TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); + port = configuration.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, + TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); + LOG.info("Service assumed at " + address + ":" + port); + } else { + Discoverable endpoint = endpointStrategy.pick(); + if (endpoint == null) { + LOG.error("Unable to discover tx service."); + throw new TException("Unable to discover tx service."); + } + address = endpoint.getSocketAddress().getHostName(); + port = endpoint.getSocketAddress().getPort(); + LOG.info("Service discovered at " + address + ":" + port); + } + + // now we have an address and port, try to connect a client + if (timeout < 0) { + timeout = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS); + } + LOG.info("Attempting to connect to tx service at " + + address + ":" + port + " with timeout " + timeout + " ms."); + // thrift transport layer + TTransport transport = + new TFramedTransport(new TSocket(address, port, timeout)); + try { + transport.open(); + } catch (TTransportException e) { + LOG.error("Unable to connect to tx service: " + e.getMessage()); + throw e; + } + // and create a thrift client + TransactionServiceThriftClient newClient = new TransactionServiceThriftClient(transport); + + LOG.info("Connected to tx service at " + + address + ":" + port); + return newClient; + } + + /** + * This class helps picking up an endpoint from a list of Discoverable. + */ + public interface EndpointStrategy { + + /** + * Picks a {@link Discoverable} using its strategy. + * @return A {@link Discoverable} based on the stragegy or {@code null} if no endpoint can be found. + */ + Discoverable pick(); + } + + /** + * An {@link EndpointStrategy} that make sure it picks an endpoint within the given + * timeout limit. + */ + public final class TimeLimitEndpointStrategy implements EndpointStrategy { + + private final EndpointStrategy delegate; + private final long timeout; + private final TimeUnit timeoutUnit; + + public TimeLimitEndpointStrategy(EndpointStrategy delegate, long timeout, TimeUnit timeoutUnit) { + this.delegate = delegate; + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + @Override + public Discoverable pick() { + Discoverable pick = delegate.pick(); + try { + long count = 0; + while (pick == null && count++ < timeout) { + timeoutUnit.sleep(1); + pick = delegate.pick(); + } + } catch (InterruptedException e) { + // Simply propagate the interrupt. + Thread.currentThread().interrupt(); + } + return pick; + } + } + + /** + * Randomly picks endpoint from the list of available endpoints. + */ + public final class RandomEndpointStrategy implements EndpointStrategy { + + private final Iterable<Discoverable> endpoints; + + /** + * Constructs a random endpoint strategy. + * @param endpoints Endpoints for the strategy to use. Note that this strategy will + * invoke {@link Iterable#iterator()} and traverse through it on + * every call to the {@link #pick()} method. One could leverage this + * behavior with the live {@link Iterable} as provided by + * {@link org.apache.twill.discovery.DiscoveryServiceClient#discover(String)} method. + */ + public RandomEndpointStrategy(Iterable<Discoverable> endpoints) { + this.endpoints = endpoints; + } + + @Override + public Discoverable pick() { + // Reservoir sampling + Discoverable result = null; + Iterator<Discoverable> itor = endpoints.iterator(); + Random random = new Random(); + int count = 0; + while (itor.hasNext()) { + Discoverable next = itor.next(); + if (random.nextInt(++count) == 0) { + result = next; + } + } + return result; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java new file mode 100644 index 0000000..ee73ef0 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/CloseableThriftClient.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +/** + * An {@link AutoCloseable} to automatically return the thrift client to the ThriftClientProvider. + */ +public class CloseableThriftClient implements AutoCloseable { + + private final ThriftClientProvider provider; + private final TransactionServiceThriftClient thriftClient; + + public CloseableThriftClient(ThriftClientProvider provider, TransactionServiceThriftClient thriftClient) { + this.provider = provider; + this.thriftClient = thriftClient; + } + + public TransactionServiceThriftClient getThriftClient() { + return thriftClient; + } + + @Override + public void close() { + // in any case, the client must be returned to the pool. The pool is + // responsible for discarding the client if it is in a bad state. + provider.returnClient(thriftClient); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java b/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java new file mode 100644 index 0000000..8ba3218 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/ElasticPool.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An Elastic Pool is an object pool that can dynamically grow. + * Normally, an element is obtained by a client and then returned to the pool + * after use. However, if the element gets into a bad state, the element can + * be discarded, based upon the recycle() method returning false. This will + * cause the element to be removed from the pool, and for a subsequent request, + * a new element can be created on the fly to replace the discarded one. + * + * The pool starts with zero (active) elements. Every time a client attempts + * to obtain an element, an element from the pool is returned if available. + * Otherwise, if the number of active elements is less than the pool's limit, + * a new element is created (using abstract method create(), this must be + * overridden by all implementations), and the number of active elements is + * increased by one. If the limit is reached, then obtain() blocks until + * either an element is returned to the pool or, if the obtain method with timeout + * parameters is used, a timeout occurs. + * + * Every time an element is returned to the pool, it is "recycled" to restore its + * fresh state for the next use or destroyed, depending on its state. + * + * @param <T> the type of the elements + * @param <E> the type of exception thrown by create() + */ +public abstract class ElasticPool<T, E extends Exception> { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticPool.class); + + /** + * A method to create a new element. Will be called every time the pool + * of available elements is empty but the limit of active elements is + * not exceeded. + * @return a new element + */ + protected abstract T create() throws E; + + /** + * A method to recycle an existing element when it is returned to the pool. + * This methods ensures that the element is in a fresh state before it can + * be reused by the next agent. If the element is not to be returned to the pool, + * calling code is responsible for destroying it and returning false. + * + * @param element the element to recycle + * @return true to reuse element, false to remove it from the pool + */ + protected boolean recycle(T element) { + // by default, simply return true + return true; + } + + // holds all currently available elements + private final ConcurrentLinkedQueue<T> elements; + + // we keep track of elements via the permits of a semaphore, because there can + // be elements in a queue, but also elements that are "loaned out" count towards + // the pool's size limit + private final Semaphore semaphore; + + public ElasticPool(int sizeLimit) { + elements = new ConcurrentLinkedQueue<>(); + semaphore = new Semaphore(sizeLimit, true); + } + + /** + * Get a element from the pool. If there is an available element in + * the pool, it will be returned. Otherwise, if the number of active + * elements does not exceed the limit, a new element is created with + * create() and returned. Otherwise, blocks until an element is either + * released and returned to the pool, or an element is discarded, + * allowing for the creation of a new element. + * + * @return an element + */ + public T obtain() throws E, InterruptedException { + semaphore.acquire(); + return getOrCreate(); + } + + /** + * Get a element from the pool. If there is an available element in + * the pool, it will be returned. Otherwise, if the number of active + * elements does ot exceed the limit, a new element is created with + * create() and returned. Otherwise, blocks until an element is either + * released and returned to the pool, an element is discarded, + * allowing for the creation of a new element, or a timeout occurs. + * + * @param timeout the timeout for trying to obtain an element + * @param unit the timeout unit for trying to obtain an element + * @return an element + * @throws TimeoutException if a client is not able to be obtained within the given timeout + */ + public T obtain(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException { + if (!semaphore.tryAcquire(1, timeout, unit)) { + throw new TimeoutException(String.format("Failed to obtain client within %d %s.", + timeout, unit)); + } + return getOrCreate(); + } + + // gets an element from the queue, or creates one if there is none available in the queue. + // the semaphore must be acquired before calling this method. The semaphore will be released from within + // this method if it throws any exception + private T getOrCreate() throws E { + try { + T client = elements.poll(); + // a client was available, all good. otherwise, create one + if (client != null) { + return client; + } + return create(); + } catch (Exception e) { + // if an exception is thrown after acquiring the semaphore, release the + // semaphore before propagating the exception + semaphore.release(); + throw e; + } + } + + /** + * Returns an element to the pool of available elements. The element must + * have been obtained from this pool through obtain(). The recycle() method + * is called before the element is available for obtain(). + * + * @param element the element to be returned + */ + public void release(T element) { + try { + if (recycle(element)) { + elements.add(element); + } + } finally { + semaphore.release(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java new file mode 100644 index 0000000..9df1441 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/PooledClientProvider.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.base.Throwables; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.thrift.TException; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * This is an tx client provider that uses a bounded size pool of connections. + */ +public class PooledClientProvider extends AbstractClientProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(PooledClientProvider.class); + + // we will use this as a pool of tx clients + class TxClientPool extends ElasticPool<TransactionServiceThriftClient, TException> { + TxClientPool(int sizeLimit) { + super(sizeLimit); + } + + @Override + protected TransactionServiceThriftClient create() throws TException { + return newClient(); + } + + @Override + protected boolean recycle(TransactionServiceThriftClient client) { + if (!client.isValid()) { + client.close(); + return false; + } + return true; + } + } + + // we will use this as a pool of tx clients + private volatile TxClientPool clients; + + // the limit for the number of active clients + private int maxClients; + // timeout, for obtaining a client + private long obtainClientTimeoutMs; + + public PooledClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) { + super(conf, discoveryServiceClient); + } + + private void initializePool() throws TException { + // initialize the super class (needed for service discovery) + super.initialize(); + + // create a (empty) pool of tx clients + maxClients = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_COUNT); + if (maxClients < 1) { + LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT + + " is invalid: value is " + maxClients + " but must be at least 1. " + + "Using 1 as a fallback. "); + maxClients = 1; + } + + obtainClientTimeoutMs = + configuration.getLong(TxConstants.Service.CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS); + if (obtainClientTimeoutMs < 0) { + LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT + + " is invalid: value is " + obtainClientTimeoutMs + " but must be at least 0. " + + "Using 0 as a fallback. "); + obtainClientTimeoutMs = 0; + } + this.clients = new TxClientPool(maxClients); + } + + @Override + public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { + TransactionServiceThriftClient client = getClientPool().obtain(obtainClientTimeoutMs, TimeUnit.MILLISECONDS); + return new CloseableThriftClient(this, client); + } + + @Override + public void returnClient(TransactionServiceThriftClient client) { + getClientPool().release(client); + } + + @Override + public String toString() { + return "Elastic pool of size " + this.maxClients + + ", with timeout (in milliseconds): " + this.obtainClientTimeoutMs; + } + + private TxClientPool getClientPool() { + if (clients != null) { + return clients; + } + + synchronized (this) { + if (clients == null) { + try { + initializePool(); + } catch (TException e) { + LOG.error("Failed to initialize Tx client provider", e); + throw Throwables.propagate(e); + } + } + } + return clients; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java new file mode 100644 index 0000000..8655c44 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryNTimes.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; + +/** + * A retry strategy that makes N attempts and then gives up. This does + * not do anything before the re-attempt - extend this class to add a + * sleep or similar. + */ +public class RetryNTimes extends RetryStrategy { + + int attempts = 0; + int limit; + + /** + * @param maxAttempts the number of attempts after which to stop + */ + protected RetryNTimes(int maxAttempts) { + limit = maxAttempts; + } + + @Override + boolean failOnce() { + ++attempts; + return attempts < limit; + } + + /** + * A retry strategy provider for this strategy. + */ + public static class Provider implements RetryStrategyProvider { + + int nTimes; + + public Provider() { + this.nTimes = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_ATTEMPTS; + } + + @Override + public void configure(Configuration config) { + nTimes = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, nTimes); + } + + @Override + public RetryStrategy newRetryStrategy() { + return new RetryNTimes(nTimes); + } + + @Override + public String toString() { + return nTimes + " attempts without delay"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java new file mode 100644 index 0000000..6fd6c9f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +/** + * A retry strategy is an abstraction over how the remote tx client shuold retry operations after connection + * failures. + */ +public abstract class RetryStrategy { + + /** + * Increments the number of failed attempts. + * @return whether another attempt should be made + */ + abstract boolean failOnce(); + + /** + * Should be called before re-attempting. This can, for instance + * inject a sleep time between retries. Default implementation is + * to do nothing. + */ + void beforeRetry() { + // do nothinhg + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java new file mode 100644 index 0000000..7763ea5 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryStrategyProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; + +/** + * A retry strategy provider is used by the tx client to get a new retry strategy for every call. + */ +public interface RetryStrategyProvider { + + /** + * Provides a new instance of a retry strategy. + * @return a retry strategy + */ + RetryStrategy newRetryStrategy(); + + /** + * Configure the strategy. + * @param config the configuration + */ + void configure(Configuration config); + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java new file mode 100644 index 0000000..ce1130e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/RetryWithBackoff.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A retry strategy that makes N attempts and then gives up. This does + * not do anything before the re-attempt - extend this class to add a + * sleep or similar. + */ +public class RetryWithBackoff extends RetryStrategy { + + private static final Logger LOG = + LoggerFactory.getLogger(RetryWithBackoff.class); + + int initialSleep; // initial sleep time + int backoffFactor; // factor by which to increase sleep for each retry + int maxSleep; // max sleep time. stop retrying when we exceed this + int sleep; // current sleep time + + /** + * @param initial the initial sleep time (before first retry) + * @param backoff the backoff factor by which sleep time is multiplied + * after each retry + * @param limit the max sleep time. if sleep time reaches this limit, we + * stop retrying + */ + protected RetryWithBackoff(int initial, int backoff, int limit) { + initialSleep = initial; + backoffFactor = backoff; + maxSleep = limit; + sleep = initialSleep; + } + + @Override + boolean failOnce() { + return sleep < maxSleep; + } + + @Override + void beforeRetry() { + LOG.info("Sleeping " + sleep + " ms before retry."); + long current = System.currentTimeMillis(); + long end = current + sleep; + while (current < end) { + try { + Thread.sleep(end - current); + } catch (InterruptedException e) { + // do nothing + } + current = System.currentTimeMillis(); + } + sleep = sleep * backoffFactor; + } + + /** + * A provider for this retry strategy. + */ + public static class Provider implements RetryStrategyProvider { + + int initialSleep; // initial sleep time + int backoffFactor; // factor by which to increase sleep for each retry + int maxSleep; // max sleep time. stop retrying when we exceed this + + public Provider() { + initialSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL; + backoffFactor = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR; + maxSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT; + } + + public void configure(Configuration config) { + initialSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_INITIAL, initialSleep); + backoffFactor = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_FACTOR, backoffFactor); + maxSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_LIMIT, maxSleep); + } + + @Override + public RetryStrategy newRetryStrategy() { + return new RetryWithBackoff(initialSleep, backoffFactor, maxSleep); + } + + @Override + public String toString() { + return "sleep " + initialSleep + " ms with back off factor " + + backoffFactor + " and limit " + maxSleep + " ms"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java new file mode 100644 index 0000000..d55da5e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/SingleUseClientProvider.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TException; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * An tx client provider that creates a new connection every time. + */ +public class SingleUseClientProvider extends AbstractClientProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(SingleUseClientProvider.class); + + public SingleUseClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient, int timeout) { + super(conf, discoveryServiceClient); + this.timeout = timeout; + } + + final int timeout; + + @Override + public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { + try { + return new CloseableThriftClient(this, newClient(timeout)); + } catch (TException e) { + LOG.error("Unable to create new tx client: " + e.getMessage()); + throw e; + } + } + + @Override + public void returnClient(TransactionServiceThriftClient client) { + client.close(); + } + + @Override + public String toString() { + return "Single-use(timeout = " + timeout + ")"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java new file mode 100644 index 0000000..dedaffe --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/ThreadLocalClientProvider.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TException; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeoutException; + +/** + * An tx client provider that uses thread local to maintain at most one open connection per thread. + * Note that there can be a connection leak if the threads are recycled. + */ +public class ThreadLocalClientProvider extends AbstractClientProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(ThreadLocalClientProvider.class); + + ThreadLocal<TransactionServiceThriftClient> clients = new ThreadLocal<>(); + + public ThreadLocalClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) { + super(conf, discoveryServiceClient); + } + + @Override + public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { + TransactionServiceThriftClient client = this.clients.get(); + if (client == null) { + try { + client = this.newClient(); + clients.set(client); + } catch (TException e) { + LOG.error("Unable to create new tx client for thread: " + + e.getMessage()); + throw e; + } + } + return new CloseableThriftClient(this, client); + } + + @Override + public void returnClient(TransactionServiceThriftClient client) { + if (!client.isValid()) { + client.close(); + clients.remove(); + } + } + + @Override + public String toString() { + return "Thread-local"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java b/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java new file mode 100644 index 0000000..6a41912 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/ThriftClientProvider.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import org.apache.thrift.TException; + +import java.util.concurrent.TimeoutException; + +/** + * This interface is used to provide thrift tx service clients: + * there is only one (singleton) + * tx service per JVM, but many threads may use it concurrently. + * However, being a thrift client, it is not thread-safe. In + * order to avoid serializing all tx calls by synchronizing + * on the tx service client, we employ a pool of clients. But + * in different scenarios there are different strategies for + * pooling: If there are many short-lived threads, it is wise + * to have a shared pool between all threads. But if there are + * few long-lived threads, it may be better to have thread-local + * client for each thread. + * + * This interface provides an abstraction of the pooling strategy. + */ +public interface ThriftClientProvider { + + /** + * Initialize the provider. At this point, it should be verified + * that tx service is up and running and getClient() can + * create new clients when necessary. + */ + void initialize() throws TException; + + /** + * Retrieve an AutoCloseable wrapper around tx client for exclusive use by the + * current thread. The client must be closed (returned) to the provider after use. + * @return an tx client, connected and fully functional + */ + CloseableThriftClient getCloseableClient() throws TException, + TimeoutException, InterruptedException; + + /** + * Release an tx client back to the provider's pool, if the client is valid. + * If the client becomes disfunctional, for instance, due to a socket + * exception. The provider must make sure to close the client, and it + * must remove the client from its arsenal and be prepared to create + * a new client subsequently. + * + * @param client The client to return + */ + void returnClient(TransactionServiceThriftClient client); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java new file mode 100644 index 0000000..66e463d --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionConverterUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.primitives.Longs; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionType; +import org.apache.tephra.distributed.thrift.TTransaction; +import org.apache.tephra.distributed.thrift.TTransactionType; +import org.apache.tephra.distributed.thrift.TVisibilityLevel; + +/** + * Utility methods to convert to thrift and back. + */ +public final class TransactionConverterUtils { + private static final long[] EMPTY_LONG_ARRAY = {}; + + public static TTransaction wrap(Transaction tx) { + return new TTransaction(tx.getTransactionId(), tx.getReadPointer(), + Longs.asList(tx.getInvalids()), Longs.asList(tx.getInProgress()), + tx.getFirstShortInProgress(), getTTransactionType(tx.getType()), + tx.getWritePointer(), Longs.asList(tx.getCheckpointWritePointers()), + getTVisibilityLevel(tx.getVisibilityLevel())); + } + + public static Transaction unwrap(TTransaction thriftTx) { + return new Transaction(thriftTx.getReadPointer(), thriftTx.getTransactionId(), thriftTx.getWritePointer(), + thriftTx.getInvalids() == null ? EMPTY_LONG_ARRAY : Longs.toArray(thriftTx.getInvalids()), + thriftTx.getInProgress() == null ? EMPTY_LONG_ARRAY : + Longs.toArray(thriftTx.getInProgress()), + thriftTx.getFirstShort(), getTransactionType(thriftTx.getType()), + thriftTx.getCheckpointWritePointers() == null ? EMPTY_LONG_ARRAY : + Longs.toArray(thriftTx.getCheckpointWritePointers()), + getVisibilityLevel(thriftTx.getVisibilityLevel())); + } + + private static TransactionType getTransactionType(TTransactionType tType) { + return tType == TTransactionType.SHORT ? TransactionType.SHORT : TransactionType.LONG; + } + + private static TTransactionType getTTransactionType(TransactionType type) { + return type == TransactionType.SHORT ? TTransactionType.SHORT : TTransactionType.LONG; + } + + private static Transaction.VisibilityLevel getVisibilityLevel(TVisibilityLevel tLevel) { + // default to SNAPSHOT + if (tLevel == null) { + return Transaction.VisibilityLevel.SNAPSHOT; + } + + switch (tLevel) { + case SNAPSHOT: + return Transaction.VisibilityLevel.SNAPSHOT; + case SNAPSHOT_EXCLUDE_CURRENT: + return Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + case SNAPSHOT_ALL: + return Transaction.VisibilityLevel.SNAPSHOT_ALL; + default: + throw new IllegalArgumentException("Unknown TVisibilityLevel: " + tLevel); + } + } + + private static TVisibilityLevel getTVisibilityLevel(Transaction.VisibilityLevel level) { + switch (level) { + case SNAPSHOT: + return TVisibilityLevel.SNAPSHOT; + case SNAPSHOT_EXCLUDE_CURRENT: + return TVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + case SNAPSHOT_ALL: + return TVisibilityLevel.SNAPSHOT_ALL; + default: + throw new IllegalArgumentException("Unknown VisibilityLevel: " + level); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java new file mode 100644 index 0000000..6fbf926 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import com.google.inject.Inject; +import com.google.inject.Provider; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.distributed.thrift.TTransactionServer; +import org.apache.tephra.inmemory.InMemoryTransactionService; +import org.apache.tephra.rpc.ThriftRPCServer; +import org.apache.twill.api.ElectionHandler; +import org.apache.twill.discovery.DiscoveryService; +import org.apache.twill.internal.ServiceListenerAdapter; +import org.apache.twill.internal.zookeeper.LeaderElection; +import org.apache.twill.zookeeper.ZKClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * + */ +public final class TransactionService extends InMemoryTransactionService { + private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class); + private LeaderElection leaderElection; + private final ZKClient zkClient; + + private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server; + + @Inject + public TransactionService(Configuration conf, + ZKClient zkClient, + DiscoveryService discoveryService, + Provider<TransactionManager> txManagerProvider) { + super(conf, discoveryService, txManagerProvider); + this.zkClient = zkClient; + } + + @Override + protected InetSocketAddress getAddress() { + if (address.equals("0.0.0.0")) { + // resolve hostname + try { + return new InetSocketAddress(InetAddress.getLocalHost().getHostName(), server.getBindAddress().getPort()); + } catch (UnknownHostException x) { + LOG.error("Cannot resolve hostname for 0.0.0.0", x); + } + } + return server.getBindAddress(); + } + + @Override + protected void doStart() { + leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new ElectionHandler() { + @Override + public void leader() { + // if the txManager fails, we should stop the server + txManager = txManagerProvider.get(); + txManager.addListener(new ServiceListenerAdapter() { + @Override + public void failed(State from, Throwable failure) { + LOG.error("Transaction manager aborted, stopping transaction service"); + TransactionService.this.abort(failure); + } + }, MoreExecutors.sameThreadExecutor()); + + server = ThriftRPCServer.builder(TTransactionServer.class) + .setHost(address) + .setPort(port) + .setWorkerThreads(threads) + .setMaxReadBufferBytes(maxReadBufferBytes) + .setIOThreads(ioThreads) + .build(new TransactionServiceThriftHandler(txManager)); + try { + server.startAndWait(); + doRegister(); + LOG.info("Transaction Thrift Service started successfully on " + getAddress()); + } catch (Throwable t) { + LOG.info("Transaction Thrift Service didn't start on " + server.getBindAddress()); + leaderElection.stop(); + notifyFailed(t); + } + } + + @Override + public void follower() { + // First stop the transaction server as un-registering from discovery can block sometimes. + // That can lead to multiple transaction servers being active at the same time. + if (server != null && server.isRunning()) { + server.stopAndWait(); + } + undoRegister(); + } + }); + leaderElection.start(); + + notifyStarted(); + } + + @VisibleForTesting + State thriftRPCServerState() { + return server.state(); + } + + @Override + protected void doStop() { + internalStop(); + notifyStopped(); + } + + protected void abort(Throwable cause) { + // try to clear leader status and shutdown RPC + internalStop(); + notifyFailed(cause); + } + + protected void internalStop() { + if (leaderElection != null) { + // NOTE: if was a leader this will cause loosing of leadership which in callback above will + // de-register service in discovery service and stop the service if needed + try { + Uninterruptibles.getUninterruptibly(leaderElection.stop(), 5, TimeUnit.SECONDS); + } catch (TimeoutException te) { + LOG.warn("Timed out waiting for leader election cancellation to complete"); + } catch (ExecutionException e) { + LOG.error("Exception when cancelling leader election.", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java new file mode 100644 index 0000000..ca15e9e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.inject.Guice; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.tephra.util.ConfigurationFactory; +import org.apache.thrift.TException; +import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +/** + * A tx service client + */ +public class TransactionServiceClient implements TransactionSystemClient { + + private static final Logger LOG = + LoggerFactory.getLogger(TransactionServiceClient.class); + + // we will use this to provide every call with an tx client + private ThriftClientProvider clientProvider; + + // the retry strategy we will use + private final RetryStrategyProvider retryStrategyProvider; + + /** + * Utility to be used for basic verification of transaction system availability and functioning + * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx + * @throws Exception + */ + public static void main(String[] args) throws Exception { + if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) { + System.out.println("USAGE: TransactionServiceClient [-v]"); + } + + boolean verbose = false; + if (args.length == 1 && "-v".equals(args[0])) { + verbose = true; + } + doMain(verbose, new ConfigurationFactory().get()); + } + + @VisibleForTesting + public static void doMain(boolean verbose, Configuration conf) throws Exception { + LOG.info("Starting tx server client test."); + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + + ZKClientService zkClient = injector.getInstance(ZKClientService.class); + zkClient.startAndWait(); + + try { + TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class); + LOG.info("Starting tx..."); + Transaction tx = client.startShort(); + if (verbose) { + LOG.info("Started tx details: " + tx.toString()); + } else { + LOG.info("Started tx: " + tx.getTransactionId() + + ", readPointer: " + tx.getReadPointer() + + ", invalids: " + tx.getInvalids().length + + ", inProgress: " + tx.getInProgress().length); + } + LOG.info("Checking if canCommit tx..."); + boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList()); + LOG.info("canCommit: " + canCommit); + if (canCommit) { + LOG.info("Committing tx..."); + boolean committed = client.commit(tx); + LOG.info("Committed tx: " + committed); + if (!committed) { + LOG.info("Aborting tx..."); + client.abort(tx); + LOG.info("Aborted tx..."); + } + } else { + LOG.info("Aborting tx..."); + client.abort(tx); + LOG.info("Aborted tx..."); + } + } finally { + zkClient.stopAndWait(); + } + } + + /** + * Create from a configuration. This will first attempt to find a zookeeper + * for service discovery. Otherwise it will look for the port in the + * config and use localhost. + * @param config a configuration containing the zookeeper properties + */ + @Inject + public TransactionServiceClient(Configuration config, + ThriftClientProvider clientProvider) { + + // initialize the retry logic + String retryStrat = config.get( + TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY); + if ("backoff".equals(retryStrat)) { + this.retryStrategyProvider = new RetryWithBackoff.Provider(); + } else if ("n-times".equals(retryStrat)) { + this.retryStrategyProvider = new RetryNTimes.Provider(); + } else { + String message = "Unknown Retry Strategy '" + retryStrat + "'."; + LOG.error(message); + throw new IllegalArgumentException(message); + } + this.retryStrategyProvider.configure(config); + LOG.debug("Retry strategy is " + this.retryStrategyProvider); + + this.clientProvider = clientProvider; + } + + /** + * This is an abstract class that encapsulates an operation. It provides a + * method to attempt the actual operation, and it can throw an operation + * exception. + * @param <T> The return type of the operation + */ + abstract static class Operation<T> { + + /** the name of the operation. */ + String name; + + /** constructor with name of operation. */ + Operation(String name) { + this.name = name; + } + + /** return the name of the operation. */ + String getName() { + return name; + } + + /** execute the operation, given an tx client. */ + abstract T execute(TransactionServiceThriftClient client) + throws Exception; + } + + /** see execute(operation, client). */ + private <T> T execute(Operation<T> operation) throws Exception { + return execute(operation, null); + } + + /** + * This is a generic method implementing the somewhat complex execution + * and retry logic for operations, to avoid repetitive code. + * + * Attempts to execute one operation, by obtaining an tx client from + * the client provider and passing the operation to the client. If the + * call fails with a Thrift exception, apply the retry strategy. If no + * more retries are to be made according to the strategy, call the + * operation's error method to obtain a value to return. Note that error() + * may also throw an exception. Note also that the retry logic is only + * applied for thrift exceptions. + * + * @param operation The operation to be executed + * @param provider An tx client provider. If null, then a client will be + * obtained using the client provider + * @param <T> The return type of the operation + * @return the result of the operation, or a value returned by error() + */ + private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception { + RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy(); + while (true) { + // did we get a custom client provider or do we use the default? + if (provider == null) { + provider = this.clientProvider; + } + // this will throw a TException if it cannot get a client + try (CloseableThriftClient closeable = provider.getCloseableClient()) { + // note that this can throw exceptions other than TException + return operation.execute(closeable.getThriftClient()); + + } catch (TException te) { + // determine whether we should retry + boolean retry = retryStrategy.failOnce(); + if (!retry) { + // retry strategy is exceeded, throw an operation exception + String message = + "Thrift error for " + operation + ": " + te.getMessage(); + LOG.error(message); + LOG.debug(message, te); + throw new Exception(message, te); + } else { + // call retry strategy before retrying + retryStrategy.beforeRetry(); + String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage(); + LOG.info(msg); + LOG.debug(msg, te); + } + + } + } + } + + @Override + public Transaction startLong() { + try { + return execute( + new Operation<Transaction>("startLong") { + @Override + public Transaction execute(TransactionServiceThriftClient client) + throws TException { + return client.startLong(); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Transaction startShort() { + try { + return execute( + new Operation<Transaction>("startShort") { + @Override + public Transaction execute(TransactionServiceThriftClient client) + throws TException { + return client.startShort(); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Transaction startShort(final int timeout) { + try { + return execute( + new Operation<Transaction>("startShort") { + @Override + public Transaction execute(TransactionServiceThriftClient client) + throws TException { + return client.startShort(timeout); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds) + throws TransactionNotInProgressException { + + try { + return execute( + new Operation<Boolean>("canCommit") { + @Override + public Boolean execute(TransactionServiceThriftClient client) + throws Exception { + return client.canCommit(tx, changeIds); + } + }); + } catch (TransactionNotInProgressException e) { + throw e; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean commit(final Transaction tx) throws TransactionNotInProgressException { + try { + return this.execute( + new Operation<Boolean>("commit") { + @Override + public Boolean execute(TransactionServiceThriftClient client) + throws Exception { + return client.commit(tx); + } + }); + } catch (TransactionNotInProgressException e) { + throw e; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void abort(final Transaction tx) { + try { + this.execute( + new Operation<Boolean>("abort") { + @Override + public Boolean execute(TransactionServiceThriftClient client) + throws TException { + client.abort(tx); + return true; + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean invalidate(final long tx) { + try { + return this.execute( + new Operation<Boolean>("invalidate") { + @Override + public Boolean execute(TransactionServiceThriftClient client) + throws TException { + return client.invalidate(tx); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException { + try { + return this.execute( + new Operation<Transaction>("checkpoint") { + @Override + Transaction execute(TransactionServiceThriftClient client) throws Exception { + return client.checkpoint(tx); + } + } + ); + } catch (TransactionNotInProgressException e) { + throw e; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { + try { + return this.execute( + new Operation<InputStream>("takeSnapshot") { + @Override + public InputStream execute(TransactionServiceThriftClient client) + throws Exception { + return client.getSnapshotStream(); + } + }); + } catch (TransactionCouldNotTakeSnapshotException e) { + throw e; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public String status() { + try { + return this.execute( + new Operation<String>("status") { + @Override + public String execute(TransactionServiceThriftClient client) throws Exception { + return client.status(); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void resetState() { + try { + this.execute( + new Operation<Boolean>("resetState") { + @Override + public Boolean execute(TransactionServiceThriftClient client) + throws TException { + client.resetState(); + return true; + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean truncateInvalidTx(final Set<Long> invalidTxIds) { + try { + return this.execute( + new Operation<Boolean>("truncateInvalidTx") { + @Override + public Boolean execute(TransactionServiceThriftClient client) throws TException { + return client.truncateInvalidTx(invalidTxIds); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException { + try { + return this.execute( + new Operation<Boolean>("truncateInvalidTxBefore") { + @Override + public Boolean execute(TransactionServiceThriftClient client) throws Exception { + return client.truncateInvalidTxBefore(time); + } + }); + } catch (InvalidTruncateTimeException e) { + throw e; + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public int getInvalidSize() { + try { + return this.execute( + new Operation<Integer>("getInvalidSize") { + @Override + public Integer execute(TransactionServiceThriftClient client) throws TException { + return client.getInvalidSize(); + } + }); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java new file mode 100644 index 0000000..a7e9592 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.distributed; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.tephra.InvalidTruncateTimeException; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCouldNotTakeSnapshotException; +import org.apache.tephra.TransactionNotInProgressException; +import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException; +import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException; +import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException; +import org.apache.tephra.distributed.thrift.TTransactionServer; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is a wrapper around the thrift tx service client, it takes + * Operations, converts them into thrift objects, calls the thrift + * client, and converts the results back to data fabric classes. + * This class also instruments the thrift calls with metrics. + */ +public class TransactionServiceThriftClient { + private static final Function<byte[], ByteBuffer> BYTES_WRAPPER = new Function<byte[], ByteBuffer>() { + @Override + public ByteBuffer apply(byte[] input) { + return ByteBuffer.wrap(input); + } + }; + + /** + * The thrift transport layer. We need this when we close the connection. + */ + TTransport transport; + + /** + * The actual thrift client. + */ + TTransactionServer.Client client; + + /** + * Whether this client is valid for use. + */ + private final AtomicBoolean isValid = new AtomicBoolean(true); + + /** + * Constructor from an existing, connected thrift transport. + * + * @param transport the thrift transport layer. It must already be connected + */ + public TransactionServiceThriftClient(TTransport transport) { + this.transport = transport; + // thrift protocol layer, we use binary because so does the service + TProtocol protocol = new TBinaryProtocol(transport); + // and create a thrift client + this.client = new TTransactionServer.Client(protocol); + } + + /** + * close this client. may be called multiple times + */ + public void close() { + if (this.transport.isOpen()) { + this.transport.close(); + } + } + + public Transaction startLong() throws TException { + try { + return TransactionConverterUtils.unwrap(client.startLong()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public Transaction startShort() throws TException { + try { + return TransactionConverterUtils.unwrap(client.startShort()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public Transaction startShort(int timeout) throws TException { + try { + return TransactionConverterUtils.unwrap(client.startShortTimeout(timeout)); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) + throws TException, TransactionNotInProgressException { + try { + return client.canCommitTx(TransactionConverterUtils.wrap(tx), + ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue(); + } catch (TTransactionNotInProgressException e) { + throw new TransactionNotInProgressException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + + + public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException { + try { + return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue(); + } catch (TTransactionNotInProgressException e) { + throw new TransactionNotInProgressException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public void abort(Transaction tx) throws TException { + try { + client.abortTx(TransactionConverterUtils.wrap(tx)); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public boolean invalidate(long tx) throws TException { + try { + return client.invalidateTx(tx); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public Transaction checkpoint(Transaction tx) throws TException { + try { + return TransactionConverterUtils.unwrap(client.checkpoint(TransactionConverterUtils.wrap(tx))); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public InputStream getSnapshotStream() throws TException, TransactionCouldNotTakeSnapshotException { + try { + ByteBuffer buffer = client.getSnapshot(); + if (buffer.hasArray()) { + return new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + + // The ByteBuffer is not backed by array. Read the content to a new byte array and return an InputStream of that. + byte[] snapshot = new byte[buffer.remaining()]; + buffer.get(snapshot); + return new ByteArrayInputStream(snapshot); + } catch (TTransactionCouldNotTakeSnapshotException e) { + throw new TransactionCouldNotTakeSnapshotException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public String status() throws TException { + try { + return client.status(); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public void resetState() throws TException { + try { + client.resetState(); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public boolean truncateInvalidTx(Set<Long> invalidTxIds) throws TException { + try { + return client.truncateInvalidTx(invalidTxIds).isValue(); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public boolean truncateInvalidTxBefore(long time) throws TException, InvalidTruncateTimeException { + try { + return client.truncateInvalidTxBefore(time).isValue(); + } catch (TInvalidTruncateTimeException e) { + throw new InvalidTruncateTimeException(e.getMessage()); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public int getInvalidSize() throws TException { + try { + return client.invalidTxSize(); + } catch (TException e) { + isValid.set(false); + throw e; + } + } + + public boolean isValid() { + return isValid.get(); + } +}
