http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java deleted file mode 100644 index 2745c76..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.coprocessor; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.persist.HDFSTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.persist.TransactionVisibilityState; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.common.util.concurrent.AbstractIdleService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * Periodically refreshes transaction state from the latest stored snapshot. This is implemented as a singleton - * to allow a single cache to be shared by all regions on a regionserver. - */ -public class TransactionStateCache extends AbstractIdleService implements Configurable { - private static final Log LOG = LogFactory.getLog(TransactionStateCache.class); - - // how frequently we should wake to check for changes (in seconds) - private static final long CHECK_FREQUENCY = 15; - - private Configuration hConf; - - private TransactionStateStorage storage; - private volatile TransactionVisibilityState latestState; - - private Thread refreshService; - private long lastRefresh; - // snapshot refresh frequency in milliseconds - private long snapshotRefreshFrequency; - private boolean initialized; - - public TransactionStateCache() { - } - - @Override - public Configuration getConf() { - return hConf; - } - - @Override - public void setConf(Configuration conf) { - this.hConf = conf; - } - - @Override - protected void startUp() throws Exception { - refreshState(); - startRefreshService(); - } - - @Override - protected void shutDown() throws Exception { - this.refreshService.interrupt(); - this.storage.stop(); - } - - /** - * Try to initialize the Configuration and TransactionStateStorage instances. Obtaining the Configuration may - * fail until ReactorServiceMain has been started. - */ - private void tryInit() { - try { - Configuration conf = getSnapshotConfiguration(); - if (conf != null) { - // Since this is only used for background loading of transaction snapshots, we use the no-op metrics collector, - // as there are no relevant metrics to report - this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), - new TxMetricsCollector()); - this.storage.startAndWait(); - this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, - TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000; - this.initialized = true; - } else { - LOG.info("Could not load configuration"); - } - } catch (Exception e) { - LOG.info("Failed to initialize TransactionStateCache due to: " + e.getMessage()); - } - } - - protected Configuration getSnapshotConfiguration() throws IOException { - Configuration conf = new ConfigurationFactory().get(hConf); - conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); - return conf; - } - - private void reset() { - this.storage.stop(); - this.lastRefresh = 0; - this.initialized = false; - } - - private void startRefreshService() { - this.refreshService = new Thread("tx-state-refresh") { - @Override - public void run() { - while (!isInterrupted()) { - if (latestState == null || System.currentTimeMillis() > (lastRefresh + snapshotRefreshFrequency)) { - try { - refreshState(); - } catch (IOException ioe) { - LOG.info("Error refreshing transaction state cache: " + ioe.getMessage()); - } - } - try { - TimeUnit.SECONDS.sleep(CHECK_FREQUENCY); - } catch (InterruptedException ie) { - // reset status - interrupt(); - break; - } - } - LOG.info("Exiting thread " + getName()); - } - }; - this.refreshService.setDaemon(true); - this.refreshService.start(); - } - - private void refreshState() throws IOException { - if (!initialized) { - tryInit(); - } - - // only continue if initialization was successful - if (initialized) { - long now = System.currentTimeMillis(); - TransactionVisibilityState currentState = storage.getLatestTransactionVisibilityState(); - if (currentState != null) { - if (currentState.getTimestamp() < (now - 2 * snapshotRefreshFrequency)) { - LOG.info("Current snapshot is old, will force a refresh on next run."); - reset(); - } else { - latestState = currentState; - LOG.info("Transaction state reloaded with snapshot from " + latestState.getTimestamp()); - if (LOG.isDebugEnabled()) { - LOG.debug("Latest transaction snapshot: " + latestState.toString()); - } - lastRefresh = now; - } - } else { - LOG.info("No transaction state found."); - } - } - } - - public TransactionVisibilityState getLatestState() { - return latestState; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java deleted file mode 100644 index 9f4a778..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.coprocessor; - -import com.google.common.base.Supplier; -import org.apache.hadoop.conf.Configuration; - -/** - * Supplies instances of {@link TransactionStateCache} implementations. - */ -public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> { - protected static volatile TransactionStateCache instance; - protected static Object lock = new Object(); - - protected final Configuration conf; - - public TransactionStateCacheSupplier(Configuration conf) { - this.conf = conf; - } - - /** - * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary. - * @return A shared instance of the transaction state cache. - */ - @Override - public TransactionStateCache get() { - if (instance == null) { - synchronized (lock) { - if (instance == null) { - instance = new TransactionStateCache(); - instance.setConf(conf); - instance.start(); - } - } - } - return instance; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java deleted file mode 100644 index 5fa566c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains HBase coprocessor implementations for the transaction system. - */ -package co.cask.tephra.coprocessor; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java deleted file mode 100644 index d0a3603..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TxConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.apache.twill.discovery.Discoverable; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * An abstract tx client provider that implements common functionality. - */ -public abstract class AbstractClientProvider implements ThriftClientProvider { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractClientProvider.class); - - // Discovery service. If null, no service discovery. - private final DiscoveryServiceClient discoveryServiceClient; - protected final AtomicBoolean initialized = new AtomicBoolean(false); - - // the configuration - final Configuration configuration; - - // the endpoint strategy for service discovery. - EndpointStrategy endpointStrategy; - - protected AbstractClientProvider(Configuration configuration, DiscoveryServiceClient discoveryServiceClient) { - this.configuration = configuration; - this.discoveryServiceClient = discoveryServiceClient; - } - - public void initialize() throws TException { - if (initialized.compareAndSet(false, true)) { - this.initDiscovery(); - } - } - - /** - * Initialize the service discovery client, we will reuse that - * every time we need to create a new client. - */ - private void initDiscovery() { - if (discoveryServiceClient == null) { - LOG.info("No DiscoveryServiceClient provided. Skipping service discovery."); - return; - } - - endpointStrategy = new TimeLimitEndpointStrategy( - new RandomEndpointStrategy( - discoveryServiceClient.discover( - configuration.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME, - TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME))), - 2, TimeUnit.SECONDS); - } - - protected TransactionServiceThriftClient newClient() throws TException { - return newClient(-1); - } - - protected TransactionServiceThriftClient newClient(int timeout) throws TException { - initialize(); - String address; - int port; - - if (endpointStrategy == null) { - // if there is no discovery service, try to read host and port directly - // from the configuration - LOG.info("Reading address and port from configuration."); - address = configuration.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS, - TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS); - port = configuration.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, - TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT); - LOG.info("Service assumed at " + address + ":" + port); - } else { - Discoverable endpoint = endpointStrategy.pick(); - if (endpoint == null) { - LOG.error("Unable to discover tx service."); - throw new TException("Unable to discover tx service."); - } - address = endpoint.getSocketAddress().getHostName(); - port = endpoint.getSocketAddress().getPort(); - LOG.info("Service discovered at " + address + ":" + port); - } - - // now we have an address and port, try to connect a client - if (timeout < 0) { - timeout = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS); - } - LOG.info("Attempting to connect to tx service at " + - address + ":" + port + " with timeout " + timeout + " ms."); - // thrift transport layer - TTransport transport = - new TFramedTransport(new TSocket(address, port, timeout)); - try { - transport.open(); - } catch (TTransportException e) { - LOG.error("Unable to connect to tx service: " + e.getMessage()); - throw e; - } - // and create a thrift client - TransactionServiceThriftClient newClient = new TransactionServiceThriftClient(transport); - - LOG.info("Connected to tx service at " + - address + ":" + port); - return newClient; - } - - /** - * This class helps picking up an endpoint from a list of Discoverable. - */ - public interface EndpointStrategy { - - /** - * Picks a {@link Discoverable} using its strategy. - * @return A {@link Discoverable} based on the stragegy or {@code null} if no endpoint can be found. - */ - Discoverable pick(); - } - - /** - * An {@link EndpointStrategy} that make sure it picks an endpoint within the given - * timeout limit. - */ - public final class TimeLimitEndpointStrategy implements EndpointStrategy { - - private final EndpointStrategy delegate; - private final long timeout; - private final TimeUnit timeoutUnit; - - public TimeLimitEndpointStrategy(EndpointStrategy delegate, long timeout, TimeUnit timeoutUnit) { - this.delegate = delegate; - this.timeout = timeout; - this.timeoutUnit = timeoutUnit; - } - - @Override - public Discoverable pick() { - Discoverable pick = delegate.pick(); - try { - long count = 0; - while (pick == null && count++ < timeout) { - timeoutUnit.sleep(1); - pick = delegate.pick(); - } - } catch (InterruptedException e) { - // Simply propagate the interrupt. - Thread.currentThread().interrupt(); - } - return pick; - } - } - - /** - * Randomly picks endpoint from the list of available endpoints. - */ - public final class RandomEndpointStrategy implements EndpointStrategy { - - private final Iterable<Discoverable> endpoints; - - /** - * Constructs a random endpoint strategy. - * @param endpoints Endpoints for the strategy to use. Note that this strategy will - * invoke {@link Iterable#iterator()} and traverse through it on - * every call to the {@link #pick()} method. One could leverage this - * behavior with the live {@link Iterable} as provided by - * {@link org.apache.twill.discovery.DiscoveryServiceClient#discover(String)} method. - */ - public RandomEndpointStrategy(Iterable<Discoverable> endpoints) { - this.endpoints = endpoints; - } - - @Override - public Discoverable pick() { - // Reservoir sampling - Discoverable result = null; - Iterator<Discoverable> itor = endpoints.iterator(); - Random random = new Random(); - int count = 0; - while (itor.hasNext()) { - Discoverable next = itor.next(); - if (random.nextInt(++count) == 0) { - result = next; - } - } - return result; - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java deleted file mode 100644 index ba6da24..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -/** - * An {@link AutoCloseable} to automatically return the thrift client to the ThriftClientProvider. - */ -public class CloseableThriftClient implements AutoCloseable { - - private final ThriftClientProvider provider; - private final TransactionServiceThriftClient thriftClient; - - public CloseableThriftClient(ThriftClientProvider provider, TransactionServiceThriftClient thriftClient) { - this.provider = provider; - this.thriftClient = thriftClient; - } - - public TransactionServiceThriftClient getThriftClient() { - return thriftClient; - } - - @Override - public void close() { - // in any case, the client must be returned to the pool. The pool is - // responsible for discarding the client if it is in a bad state. - provider.returnClient(thriftClient); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java deleted file mode 100644 index 843ab90..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * An Elastic Pool is an object pool that can dynamically grow. - * Normally, an element is obtained by a client and then returned to the pool - * after use. However, if the element gets into a bad state, the element can - * be discarded, based upon the recycle() method returning false. This will - * cause the element to be removed from the pool, and for a subsequent request, - * a new element can be created on the fly to replace the discarded one. - * - * The pool starts with zero (active) elements. Every time a client attempts - * to obtain an element, an element from the pool is returned if available. - * Otherwise, if the number of active elements is less than the pool's limit, - * a new element is created (using abstract method create(), this must be - * overridden by all implementations), and the number of active elements is - * increased by one. If the limit is reached, then obtain() blocks until - * either an element is returned to the pool or, if the obtain method with timeout - * parameters is used, a timeout occurs. - * - * Every time an element is returned to the pool, it is "recycled" to restore its - * fresh state for the next use or destroyed, depending on its state. - * - * @param <T> the type of the elements - * @param <E> the type of exception thrown by create() - */ -public abstract class ElasticPool<T, E extends Exception> { - - private static final Logger LOG = - LoggerFactory.getLogger(ElasticPool.class); - - /** - * A method to create a new element. Will be called every time the pool - * of available elements is empty but the limit of active elements is - * not exceeded. - * @return a new element - */ - protected abstract T create() throws E; - - /** - * A method to recycle an existing element when it is returned to the pool. - * This methods ensures that the element is in a fresh state before it can - * be reused by the next agent. If the element is not to be returned to the pool, - * calling code is responsible for destroying it and returning false. - * - * @param element the element to recycle - * @return true to reuse element, false to remove it from the pool - */ - protected boolean recycle(T element) { - // by default, simply return true - return true; - } - - // holds all currently available elements - private final ConcurrentLinkedQueue<T> elements; - - // we keep track of elements via the permits of a semaphore, because there can - // be elements in a queue, but also elements that are "loaned out" count towards - // the pool's size limit - private final Semaphore semaphore; - - public ElasticPool(int sizeLimit) { - elements = new ConcurrentLinkedQueue<>(); - semaphore = new Semaphore(sizeLimit, true); - } - - /** - * Get a element from the pool. If there is an available element in - * the pool, it will be returned. Otherwise, if the number of active - * elements does not exceed the limit, a new element is created with - * create() and returned. Otherwise, blocks until an element is either - * released and returned to the pool, or an element is discarded, - * allowing for the creation of a new element. - * - * @return an element - */ - public T obtain() throws E, InterruptedException { - semaphore.acquire(); - return getOrCreate(); - } - - /** - * Get a element from the pool. If there is an available element in - * the pool, it will be returned. Otherwise, if the number of active - * elements does ot exceed the limit, a new element is created with - * create() and returned. Otherwise, blocks until an element is either - * released and returned to the pool, an element is discarded, - * allowing for the creation of a new element, or a timeout occurs. - * - * @param timeout the timeout for trying to obtain an element - * @param unit the timeout unit for trying to obtain an element - * @return an element - * @throws TimeoutException if a client is not able to be obtained within the given timeout - */ - public T obtain(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException { - if (!semaphore.tryAcquire(1, timeout, unit)) { - throw new TimeoutException(String.format("Failed to obtain client within %d %s.", - timeout, unit)); - } - return getOrCreate(); - } - - // gets an element from the queue, or creates one if there is none available in the queue. - // the semaphore must be acquired before calling this method. The semaphore will be released from within - // this method if it throws any exception - private T getOrCreate() throws E { - try { - T client = elements.poll(); - // a client was available, all good. otherwise, create one - if (client != null) { - return client; - } - return create(); - } catch (Exception e) { - // if an exception is thrown after acquiring the semaphore, release the - // semaphore before propagating the exception - semaphore.release(); - throw e; - } - } - - /** - * Returns an element to the pool of available elements. The element must - * have been obtained from this pool through obtain(). The recycle() method - * is called before the element is available for obtain(). - * - * @param element the element to be returned - */ - public void release(T element) { - try { - if (recycle(element)) { - elements.add(element); - } - } finally { - semaphore.release(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java deleted file mode 100644 index 3394811..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TxConstants; -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.TException; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * This is an tx client provider that uses a bounded size pool of connections. - */ -public class PooledClientProvider extends AbstractClientProvider { - - private static final Logger LOG = - LoggerFactory.getLogger(PooledClientProvider.class); - - // we will use this as a pool of tx clients - class TxClientPool extends ElasticPool<TransactionServiceThriftClient, TException> { - TxClientPool(int sizeLimit) { - super(sizeLimit); - } - - @Override - protected TransactionServiceThriftClient create() throws TException { - return newClient(); - } - - @Override - protected boolean recycle(TransactionServiceThriftClient client) { - if (!client.isValid()) { - client.close(); - return false; - } - return true; - } - } - - // we will use this as a pool of tx clients - private volatile TxClientPool clients; - - // the limit for the number of active clients - private int maxClients; - // timeout, for obtaining a client - private long obtainClientTimeoutMs; - - public PooledClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) { - super(conf, discoveryServiceClient); - } - - private void initializePool() throws TException { - // initialize the super class (needed for service discovery) - super.initialize(); - - // create a (empty) pool of tx clients - maxClients = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_COUNT); - if (maxClients < 1) { - LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT + - " is invalid: value is " + maxClients + " but must be at least 1. " + - "Using 1 as a fallback. "); - maxClients = 1; - } - - obtainClientTimeoutMs = - configuration.getLong(TxConstants.Service.CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS); - if (obtainClientTimeoutMs < 0) { - LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT + - " is invalid: value is " + obtainClientTimeoutMs + " but must be at least 0. " + - "Using 0 as a fallback. "); - obtainClientTimeoutMs = 0; - } - this.clients = new TxClientPool(maxClients); - } - - @Override - public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { - TransactionServiceThriftClient client = getClientPool().obtain(obtainClientTimeoutMs, TimeUnit.MILLISECONDS); - return new CloseableThriftClient(this, client); - } - - @Override - public void returnClient(TransactionServiceThriftClient client) { - getClientPool().release(client); - } - - @Override - public String toString() { - return "Elastic pool of size " + this.maxClients + - ", with timeout (in milliseconds): " + this.obtainClientTimeoutMs; - } - - private TxClientPool getClientPool() { - if (clients != null) { - return clients; - } - - synchronized (this) { - if (clients == null) { - try { - initializePool(); - } catch (TException e) { - LOG.error("Failed to initialize Tx client provider", e); - throw Throwables.propagate(e); - } - } - } - return clients; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java deleted file mode 100644 index 22a4015..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TxConstants; -import org.apache.hadoop.conf.Configuration; - -/** - * A retry strategy that makes N attempts and then gives up. This does - * not do anything before the re-attempt - extend this class to add a - * sleep or similar. - */ -public class RetryNTimes extends RetryStrategy { - - int attempts = 0; - int limit; - - /** - * @param maxAttempts the number of attempts after which to stop - */ - protected RetryNTimes(int maxAttempts) { - limit = maxAttempts; - } - - @Override - boolean failOnce() { - ++attempts; - return attempts < limit; - } - - /** - * A retry strategy provider for this strategy. - */ - public static class Provider implements RetryStrategyProvider { - - int nTimes; - - public Provider() { - this.nTimes = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_ATTEMPTS; - } - - @Override - public void configure(Configuration config) { - nTimes = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, nTimes); - } - - @Override - public RetryStrategy newRetryStrategy() { - return new RetryNTimes(nTimes); - } - - @Override - public String toString() { - return nTimes + " attempts without delay"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java deleted file mode 100644 index 80ae779..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -/** - * A retry strategy is an abstraction over how the remote tx client shuold retry operations after connection - * failures. - */ -public abstract class RetryStrategy { - - /** - * Increments the number of failed attempts. - * @return whether another attempt should be made - */ - abstract boolean failOnce(); - - /** - * Should be called before re-attempting. This can, for instance - * inject a sleep time between retries. Default implementation is - * to do nothing. - */ - void beforeRetry() { - // do nothinhg - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java deleted file mode 100644 index 3ea2e94..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import org.apache.hadoop.conf.Configuration; - -/** - * A retry strategy provider is used by the tx client to get a new retry strategy for every call. - */ -public interface RetryStrategyProvider { - - /** - * Provides a new instance of a retry strategy. - * @return a retry strategy - */ - RetryStrategy newRetryStrategy(); - - /** - * Configure the strategy. - * @param config the configuration - */ - void configure(Configuration config); - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java deleted file mode 100644 index 3226ba7..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TxConstants; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A retry strategy that makes N attempts and then gives up. This does - * not do anything before the re-attempt - extend this class to add a - * sleep or similar. - */ -public class RetryWithBackoff extends RetryStrategy { - - private static final Logger LOG = - LoggerFactory.getLogger(RetryWithBackoff.class); - - int initialSleep; // initial sleep time - int backoffFactor; // factor by which to increase sleep for each retry - int maxSleep; // max sleep time. stop retrying when we exceed this - int sleep; // current sleep time - - /** - * @param initial the initial sleep time (before first retry) - * @param backoff the backoff factor by which sleep time is multiplied - * after each retry - * @param limit the max sleep time. if sleep time reaches this limit, we - * stop retrying - */ - protected RetryWithBackoff(int initial, int backoff, int limit) { - initialSleep = initial; - backoffFactor = backoff; - maxSleep = limit; - sleep = initialSleep; - } - - @Override - boolean failOnce() { - return sleep < maxSleep; - } - - @Override - void beforeRetry() { - LOG.info("Sleeping " + sleep + " ms before retry."); - long current = System.currentTimeMillis(); - long end = current + sleep; - while (current < end) { - try { - Thread.sleep(end - current); - } catch (InterruptedException e) { - // do nothing - } - current = System.currentTimeMillis(); - } - sleep = sleep * backoffFactor; - } - - /** - * A provider for this retry strategy. - */ - public static class Provider implements RetryStrategyProvider { - - int initialSleep; // initial sleep time - int backoffFactor; // factor by which to increase sleep for each retry - int maxSleep; // max sleep time. stop retrying when we exceed this - - public Provider() { - initialSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL; - backoffFactor = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR; - maxSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT; - } - - public void configure(Configuration config) { - initialSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_INITIAL, initialSleep); - backoffFactor = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_FACTOR, backoffFactor); - maxSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_LIMIT, maxSleep); - } - - @Override - public RetryStrategy newRetryStrategy() { - return new RetryWithBackoff(initialSleep, backoffFactor, maxSleep); - } - - @Override - public String toString() { - return "sleep " + initialSleep + " ms with back off factor " + - backoffFactor + " and limit " + maxSleep + " ms"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java deleted file mode 100644 index 8673ca4..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.TException; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeoutException; - -/** - * An tx client provider that creates a new connection every time. - */ -public class SingleUseClientProvider extends AbstractClientProvider { - - private static final Logger LOG = - LoggerFactory.getLogger(SingleUseClientProvider.class); - - public SingleUseClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient, int timeout) { - super(conf, discoveryServiceClient); - this.timeout = timeout; - } - - final int timeout; - - @Override - public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { - try { - return new CloseableThriftClient(this, newClient(timeout)); - } catch (TException e) { - LOG.error("Unable to create new tx client: " + e.getMessage()); - throw e; - } - } - - @Override - public void returnClient(TransactionServiceThriftClient client) { - client.close(); - } - - @Override - public String toString() { - return "Single-use(timeout = " + timeout + ")"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java deleted file mode 100644 index eac75bd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.TException; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeoutException; - -/** - * An tx client provider that uses thread local to maintain at most one open connection per thread. - * Note that there can be a connection leak if the threads are recycled. - */ -public class ThreadLocalClientProvider extends AbstractClientProvider { - - private static final Logger LOG = - LoggerFactory.getLogger(ThreadLocalClientProvider.class); - - ThreadLocal<TransactionServiceThriftClient> clients = new ThreadLocal<>(); - - public ThreadLocalClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) { - super(conf, discoveryServiceClient); - } - - @Override - public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException { - TransactionServiceThriftClient client = this.clients.get(); - if (client == null) { - try { - client = this.newClient(); - clients.set(client); - } catch (TException e) { - LOG.error("Unable to create new tx client for thread: " - + e.getMessage()); - throw e; - } - } - return new CloseableThriftClient(this, client); - } - - @Override - public void returnClient(TransactionServiceThriftClient client) { - if (!client.isValid()) { - client.close(); - clients.remove(); - } - } - - @Override - public String toString() { - return "Thread-local"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java deleted file mode 100644 index 1e86c08..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import org.apache.thrift.TException; - -import java.util.concurrent.TimeoutException; - -/** - * This interface is used to provide thrift tx service clients: - * there is only one (singleton) - * tx service per JVM, but many threads may use it concurrently. - * However, being a thrift client, it is not thread-safe. In - * order to avoid serializing all tx calls by synchronizing - * on the tx service client, we employ a pool of clients. But - * in different scenarios there are different strategies for - * pooling: If there are many short-lived threads, it is wise - * to have a shared pool between all threads. But if there are - * few long-lived threads, it may be better to have thread-local - * client for each thread. - * - * This interface provides an abstraction of the pooling strategy. - */ -public interface ThriftClientProvider { - - /** - * Initialize the provider. At this point, it should be verified - * that tx service is up and running and getClient() can - * create new clients when necessary. - */ - void initialize() throws TException; - - /** - * Retrieve an AutoCloseable wrapper around tx client for exclusive use by the - * current thread. The client must be closed (returned) to the provider after use. - * @return an tx client, connected and fully functional - */ - CloseableThriftClient getCloseableClient() throws TException, - TimeoutException, InterruptedException; - - /** - * Release an tx client back to the provider's pool, if the client is valid. - * If the client becomes disfunctional, for instance, due to a socket - * exception. The provider must make sure to close the client, and it - * must remove the client from its arsenal and be prepared to create - * a new client subsequently. - * - * @param client The client to return - */ - void returnClient(TransactionServiceThriftClient client); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java deleted file mode 100644 index bda39fd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionType; -import co.cask.tephra.distributed.thrift.TTransaction; -import co.cask.tephra.distributed.thrift.TTransactionType; -import co.cask.tephra.distributed.thrift.TVisibilityLevel; -import com.google.common.primitives.Longs; - -/** - * Utility methods to convert to thrift and back. - */ -public final class TransactionConverterUtils { - private static final long[] EMPTY_LONG_ARRAY = {}; - - public static TTransaction wrap(Transaction tx) { - return new TTransaction(tx.getTransactionId(), tx.getReadPointer(), - Longs.asList(tx.getInvalids()), Longs.asList(tx.getInProgress()), - tx.getFirstShortInProgress(), getTTransactionType(tx.getType()), - tx.getWritePointer(), Longs.asList(tx.getCheckpointWritePointers()), - getTVisibilityLevel(tx.getVisibilityLevel())); - } - - public static Transaction unwrap(TTransaction thriftTx) { - return new Transaction(thriftTx.getReadPointer(), thriftTx.getTransactionId(), thriftTx.getWritePointer(), - thriftTx.getInvalids() == null ? EMPTY_LONG_ARRAY : Longs.toArray(thriftTx.getInvalids()), - thriftTx.getInProgress() == null ? EMPTY_LONG_ARRAY : - Longs.toArray(thriftTx.getInProgress()), - thriftTx.getFirstShort(), getTransactionType(thriftTx.getType()), - thriftTx.getCheckpointWritePointers() == null ? EMPTY_LONG_ARRAY : - Longs.toArray(thriftTx.getCheckpointWritePointers()), - getVisibilityLevel(thriftTx.getVisibilityLevel())); - } - - private static TransactionType getTransactionType(TTransactionType tType) { - return tType == TTransactionType.SHORT ? TransactionType.SHORT : TransactionType.LONG; - } - - private static TTransactionType getTTransactionType(TransactionType type) { - return type == TransactionType.SHORT ? TTransactionType.SHORT : TTransactionType.LONG; - } - - private static Transaction.VisibilityLevel getVisibilityLevel(TVisibilityLevel tLevel) { - // default to SNAPSHOT - if (tLevel == null) { - return Transaction.VisibilityLevel.SNAPSHOT; - } - - switch (tLevel) { - case SNAPSHOT: - return Transaction.VisibilityLevel.SNAPSHOT; - case SNAPSHOT_EXCLUDE_CURRENT: - return Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; - case SNAPSHOT_ALL: - return Transaction.VisibilityLevel.SNAPSHOT_ALL; - default: - throw new IllegalArgumentException("Unknown TVisibilityLevel: " + tLevel); - } - } - - private static TVisibilityLevel getTVisibilityLevel(Transaction.VisibilityLevel level) { - switch (level) { - case SNAPSHOT: - return TVisibilityLevel.SNAPSHOT; - case SNAPSHOT_EXCLUDE_CURRENT: - return TVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; - case SNAPSHOT_ALL: - return TVisibilityLevel.SNAPSHOT_ALL; - default: - throw new IllegalArgumentException("Unknown VisibilityLevel: " + level); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java deleted file mode 100644 index 22e986d..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.TransactionManager; -import co.cask.tephra.distributed.thrift.TTransactionServer; -import co.cask.tephra.inmemory.InMemoryTransactionService; -import co.cask.tephra.rpc.ThriftRPCServer; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Uninterruptibles; -import com.google.inject.Inject; -import com.google.inject.Provider; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.api.ElectionHandler; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.internal.ServiceListenerAdapter; -import org.apache.twill.internal.zookeeper.LeaderElection; -import org.apache.twill.zookeeper.ZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * - */ -public final class TransactionService extends InMemoryTransactionService { - private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class); - private LeaderElection leaderElection; - private final ZKClient zkClient; - - private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server; - - @Inject - public TransactionService(Configuration conf, - ZKClient zkClient, - DiscoveryService discoveryService, - Provider<TransactionManager> txManagerProvider) { - super(conf, discoveryService, txManagerProvider); - this.zkClient = zkClient; - } - - @Override - protected InetSocketAddress getAddress() { - if (address.equals("0.0.0.0")) { - // resolve hostname - try { - return new InetSocketAddress(InetAddress.getLocalHost().getHostName(), server.getBindAddress().getPort()); - } catch (UnknownHostException x) { - LOG.error("Cannot resolve hostname for 0.0.0.0", x); - } - } - return server.getBindAddress(); - } - - @Override - protected void doStart() { - leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new ElectionHandler() { - @Override - public void leader() { - // if the txManager fails, we should stop the server - txManager = txManagerProvider.get(); - txManager.addListener(new ServiceListenerAdapter() { - @Override - public void failed(State from, Throwable failure) { - LOG.error("Transaction manager aborted, stopping transaction service"); - TransactionService.this.abort(failure); - } - }, MoreExecutors.sameThreadExecutor()); - - server = ThriftRPCServer.builder(TTransactionServer.class) - .setHost(address) - .setPort(port) - .setWorkerThreads(threads) - .setMaxReadBufferBytes(maxReadBufferBytes) - .setIOThreads(ioThreads) - .build(new TransactionServiceThriftHandler(txManager)); - try { - server.startAndWait(); - doRegister(); - LOG.info("Transaction Thrift Service started successfully on " + getAddress()); - } catch (Throwable t) { - LOG.info("Transaction Thrift Service didn't start on " + server.getBindAddress()); - leaderElection.stop(); - notifyFailed(t); - } - } - - @Override - public void follower() { - // First stop the transaction server as un-registering from discovery can block sometimes. - // That can lead to multiple transaction servers being active at the same time. - if (server != null && server.isRunning()) { - server.stopAndWait(); - } - undoRegister(); - } - }); - leaderElection.start(); - - notifyStarted(); - } - - @VisibleForTesting - State thriftRPCServerState() { - return server.state(); - } - - @Override - protected void doStop() { - internalStop(); - notifyStopped(); - } - - protected void abort(Throwable cause) { - // try to clear leader status and shutdown RPC - internalStop(); - notifyFailed(cause); - } - - protected void internalStop() { - if (leaderElection != null) { - // NOTE: if was a leader this will cause loosing of leadership which in callback above will - // de-register service in discovery service and stop the service if needed - try { - Uninterruptibles.getUninterruptibly(leaderElection.stop(), 5, TimeUnit.SECONDS); - } catch (TimeoutException te) { - LOG.warn("Timed out waiting for leader election cancellation to complete"); - } catch (ExecutionException e) { - LOG.error("Exception when cancelling leader election.", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java deleted file mode 100644 index 4f29730..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java +++ /dev/null @@ -1,473 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.InvalidTruncateTimeException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCouldNotTakeSnapshotException; -import co.cask.tephra.TransactionNotInProgressException; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import co.cask.tephra.util.ConfigurationFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.apache.thrift.TException; -import org.apache.twill.zookeeper.ZKClientService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; - -/** - * A tx service client - */ -public class TransactionServiceClient implements TransactionSystemClient { - - private static final Logger LOG = - LoggerFactory.getLogger(TransactionServiceClient.class); - - // we will use this to provide every call with an tx client - private ThriftClientProvider clientProvider; - - // the retry strategy we will use - private final RetryStrategyProvider retryStrategyProvider; - - /** - * Utility to be used for basic verification of transaction system availability and functioning - * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx - * @throws Exception - */ - public static void main(String[] args) throws Exception { - if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) { - System.out.println("USAGE: TransactionServiceClient [-v]"); - } - - boolean verbose = false; - if (args.length == 1 && "-v".equals(args[0])) { - verbose = true; - } - doMain(verbose, new ConfigurationFactory().get()); - } - - @VisibleForTesting - public static void doMain(boolean verbose, Configuration conf) throws Exception { - LOG.info("Starting tx server client test."); - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() - ); - - ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); - - try { - TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class); - LOG.info("Starting tx..."); - Transaction tx = client.startShort(); - if (verbose) { - LOG.info("Started tx details: " + tx.toString()); - } else { - LOG.info("Started tx: " + tx.getTransactionId() + - ", readPointer: " + tx.getReadPointer() + - ", invalids: " + tx.getInvalids().length + - ", inProgress: " + tx.getInProgress().length); - } - LOG.info("Checking if canCommit tx..."); - boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList()); - LOG.info("canCommit: " + canCommit); - if (canCommit) { - LOG.info("Committing tx..."); - boolean committed = client.commit(tx); - LOG.info("Committed tx: " + committed); - if (!committed) { - LOG.info("Aborting tx..."); - client.abort(tx); - LOG.info("Aborted tx..."); - } - } else { - LOG.info("Aborting tx..."); - client.abort(tx); - LOG.info("Aborted tx..."); - } - } finally { - zkClient.stopAndWait(); - } - } - - /** - * Create from a configuration. This will first attempt to find a zookeeper - * for service discovery. Otherwise it will look for the port in the - * config and use localhost. - * @param config a configuration containing the zookeeper properties - */ - @Inject - public TransactionServiceClient(Configuration config, - ThriftClientProvider clientProvider) { - - // initialize the retry logic - String retryStrat = config.get( - TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY); - if ("backoff".equals(retryStrat)) { - this.retryStrategyProvider = new RetryWithBackoff.Provider(); - } else if ("n-times".equals(retryStrat)) { - this.retryStrategyProvider = new RetryNTimes.Provider(); - } else { - String message = "Unknown Retry Strategy '" + retryStrat + "'."; - LOG.error(message); - throw new IllegalArgumentException(message); - } - this.retryStrategyProvider.configure(config); - LOG.debug("Retry strategy is " + this.retryStrategyProvider); - - this.clientProvider = clientProvider; - } - - /** - * This is an abstract class that encapsulates an operation. It provides a - * method to attempt the actual operation, and it can throw an operation - * exception. - * @param <T> The return type of the operation - */ - abstract static class Operation<T> { - - /** the name of the operation. */ - String name; - - /** constructor with name of operation. */ - Operation(String name) { - this.name = name; - } - - /** return the name of the operation. */ - String getName() { - return name; - } - - /** execute the operation, given an tx client. */ - abstract T execute(TransactionServiceThriftClient client) - throws Exception; - } - - /** see execute(operation, client). */ - private <T> T execute(Operation<T> operation) throws Exception { - return execute(operation, null); - } - - /** - * This is a generic method implementing the somewhat complex execution - * and retry logic for operations, to avoid repetitive code. - * - * Attempts to execute one operation, by obtaining an tx client from - * the client provider and passing the operation to the client. If the - * call fails with a Thrift exception, apply the retry strategy. If no - * more retries are to be made according to the strategy, call the - * operation's error method to obtain a value to return. Note that error() - * may also throw an exception. Note also that the retry logic is only - * applied for thrift exceptions. - * - * @param operation The operation to be executed - * @param provider An tx client provider. If null, then a client will be - * obtained using the client provider - * @param <T> The return type of the operation - * @return the result of the operation, or a value returned by error() - */ - private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception { - RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy(); - while (true) { - // did we get a custom client provider or do we use the default? - if (provider == null) { - provider = this.clientProvider; - } - // this will throw a TException if it cannot get a client - try (CloseableThriftClient closeable = provider.getCloseableClient()) { - // note that this can throw exceptions other than TException - return operation.execute(closeable.getThriftClient()); - - } catch (TException te) { - // determine whether we should retry - boolean retry = retryStrategy.failOnce(); - if (!retry) { - // retry strategy is exceeded, throw an operation exception - String message = - "Thrift error for " + operation + ": " + te.getMessage(); - LOG.error(message); - LOG.debug(message, te); - throw new Exception(message, te); - } else { - // call retry strategy before retrying - retryStrategy.beforeRetry(); - String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage(); - LOG.info(msg); - LOG.debug(msg, te); - } - - } - } - } - - @Override - public Transaction startLong() { - try { - return execute( - new Operation<Transaction>("startLong") { - @Override - public Transaction execute(TransactionServiceThriftClient client) - throws TException { - return client.startLong(); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public Transaction startShort() { - try { - return execute( - new Operation<Transaction>("startShort") { - @Override - public Transaction execute(TransactionServiceThriftClient client) - throws TException { - return client.startShort(); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public Transaction startShort(final int timeout) { - try { - return execute( - new Operation<Transaction>("startShort") { - @Override - public Transaction execute(TransactionServiceThriftClient client) - throws TException { - return client.startShort(timeout); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds) - throws TransactionNotInProgressException { - - try { - return execute( - new Operation<Boolean>("canCommit") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws Exception { - return client.canCommit(tx, changeIds); - } - }); - } catch (TransactionNotInProgressException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean commit(final Transaction tx) throws TransactionNotInProgressException { - try { - return this.execute( - new Operation<Boolean>("commit") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws Exception { - return client.commit(tx); - } - }); - } catch (TransactionNotInProgressException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void abort(final Transaction tx) { - try { - this.execute( - new Operation<Boolean>("abort") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws TException { - client.abort(tx); - return true; - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean invalidate(final long tx) { - try { - return this.execute( - new Operation<Boolean>("invalidate") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws TException { - return client.invalidate(tx); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException { - try { - return this.execute( - new Operation<Transaction>("checkpoint") { - @Override - Transaction execute(TransactionServiceThriftClient client) throws Exception { - return client.checkpoint(tx); - } - } - ); - } catch (TransactionNotInProgressException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException { - try { - return this.execute( - new Operation<InputStream>("takeSnapshot") { - @Override - public InputStream execute(TransactionServiceThriftClient client) - throws Exception { - return client.getSnapshotStream(); - } - }); - } catch (TransactionCouldNotTakeSnapshotException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public String status() { - try { - return this.execute( - new Operation<String>("status") { - @Override - public String execute(TransactionServiceThriftClient client) throws Exception { - return client.status(); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void resetState() { - try { - this.execute( - new Operation<Boolean>("resetState") { - @Override - public Boolean execute(TransactionServiceThriftClient client) - throws TException { - client.resetState(); - return true; - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean truncateInvalidTx(final Set<Long> invalidTxIds) { - try { - return this.execute( - new Operation<Boolean>("truncateInvalidTx") { - @Override - public Boolean execute(TransactionServiceThriftClient client) throws TException { - return client.truncateInvalidTx(invalidTxIds); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException { - try { - return this.execute( - new Operation<Boolean>("truncateInvalidTxBefore") { - @Override - public Boolean execute(TransactionServiceThriftClient client) throws Exception { - return client.truncateInvalidTxBefore(time); - } - }); - } catch (InvalidTruncateTimeException e) { - throw e; - } catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public int getInvalidSize() { - try { - return this.execute( - new Operation<Integer>("getInvalidSize") { - @Override - public Integer execute(TransactionServiceThriftClient client) throws TException { - return client.getInvalidSize(); - } - }); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java deleted file mode 100644 index 01f9b4c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.distributed; - -import co.cask.tephra.InvalidTruncateTimeException; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCouldNotTakeSnapshotException; -import co.cask.tephra.TransactionNotInProgressException; -import co.cask.tephra.distributed.thrift.TInvalidTruncateTimeException; -import co.cask.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException; -import co.cask.tephra.distributed.thrift.TTransactionNotInProgressException; -import co.cask.tephra.distributed.thrift.TTransactionServer; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class is a wrapper around the thrift tx service client, it takes - * Operations, converts them into thrift objects, calls the thrift - * client, and converts the results back to data fabric classes. - * This class also instruments the thrift calls with metrics. - */ -public class TransactionServiceThriftClient { - private static final Function<byte[], ByteBuffer> BYTES_WRAPPER = new Function<byte[], ByteBuffer>() { - @Override - public ByteBuffer apply(byte[] input) { - return ByteBuffer.wrap(input); - } - }; - - /** - * The thrift transport layer. We need this when we close the connection. - */ - TTransport transport; - - /** - * The actual thrift client. - */ - TTransactionServer.Client client; - - /** - * Whether this client is valid for use. - */ - private final AtomicBoolean isValid = new AtomicBoolean(true); - - /** - * Constructor from an existing, connected thrift transport. - * - * @param transport the thrift transport layer. It must already be connected - */ - public TransactionServiceThriftClient(TTransport transport) { - this.transport = transport; - // thrift protocol layer, we use binary because so does the service - TProtocol protocol = new TBinaryProtocol(transport); - // and create a thrift client - this.client = new TTransactionServer.Client(protocol); - } - - /** - * close this client. may be called multiple times - */ - public void close() { - if (this.transport.isOpen()) { - this.transport.close(); - } - } - - public Transaction startLong() throws TException { - try { - return TransactionConverterUtils.unwrap(client.startLong()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public Transaction startShort() throws TException { - try { - return TransactionConverterUtils.unwrap(client.startShort()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public Transaction startShort(int timeout) throws TException { - try { - return TransactionConverterUtils.unwrap(client.startShortTimeout(timeout)); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean canCommit(Transaction tx, Collection<byte[]> changeIds) - throws TException, TransactionNotInProgressException { - try { - return client.canCommitTx(TransactionConverterUtils.wrap(tx), - ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue(); - } catch (TTransactionNotInProgressException e) { - throw new TransactionNotInProgressException(e.getMessage()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - - - public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException { - try { - return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue(); - } catch (TTransactionNotInProgressException e) { - throw new TransactionNotInProgressException(e.getMessage()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public void abort(Transaction tx) throws TException { - try { - client.abortTx(TransactionConverterUtils.wrap(tx)); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean invalidate(long tx) throws TException { - try { - return client.invalidateTx(tx); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public Transaction checkpoint(Transaction tx) throws TException { - try { - return TransactionConverterUtils.unwrap(client.checkpoint(TransactionConverterUtils.wrap(tx))); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public InputStream getSnapshotStream() throws TException, TransactionCouldNotTakeSnapshotException { - try { - ByteBuffer buffer = client.getSnapshot(); - if (buffer.hasArray()) { - return new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); - } - - // The ByteBuffer is not backed by array. Read the content to a new byte array and return an InputStream of that. - byte[] snapshot = new byte[buffer.remaining()]; - buffer.get(snapshot); - return new ByteArrayInputStream(snapshot); - } catch (TTransactionCouldNotTakeSnapshotException e) { - throw new TransactionCouldNotTakeSnapshotException(e.getMessage()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public String status() throws TException { - try { - return client.status(); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public void resetState() throws TException { - try { - client.resetState(); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean truncateInvalidTx(Set<Long> invalidTxIds) throws TException { - try { - return client.truncateInvalidTx(invalidTxIds).isValue(); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean truncateInvalidTxBefore(long time) throws TException, InvalidTruncateTimeException { - try { - return client.truncateInvalidTxBefore(time).isValue(); - } catch (TInvalidTruncateTimeException e) { - throw new InvalidTruncateTimeException(e.getMessage()); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public int getInvalidSize() throws TException { - try { - return client.invalidTxSize(); - } catch (TException e) { - isValid.set(false); - throw e; - } - } - - public boolean isValid() { - return isValid.get(); - } -}
