http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java deleted file mode 100644 index 2bc51b5..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import com.google.inject.Provider; -import com.google.inject.ProvisionException; -import org.apache.twill.internal.utils.Instances; - -/** - * Common class factory behavior for classes which need specific implementations depending on HBase versions. - * Specific factories can subclass this class and simply plug in the class names for their implementations. - * - * @param <T> Version specific class provided by this factory. - */ -public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> { - @Override - public T get() { - T instance = null; - try { - switch (HBaseVersion.get()) { - case HBASE_94: - throw new ProvisionException("HBase 0.94 is no longer supported. Please upgrade to HBase 0.96 or newer."); - case HBASE_96: - instance = createInstance(getHBase96Classname()); - break; - case HBASE_98: - instance = createInstance(getHBase98Classname()); - break; - case HBASE_10: - instance = createInstance(getHBase10Classname()); - break; - case HBASE_10_CDH: - instance = createInstance(getHBase10CDHClassname()); - break; - case HBASE_11: - case HBASE_12_CDH: - instance = createInstance(getHBase11Classname()); - break; - case UNKNOWN: - throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString()); - } - } catch (ClassNotFoundException cnfe) { - throw new ProvisionException(cnfe.getMessage(), cnfe); - } - return instance; - } - - protected T createInstance(String className) throws ClassNotFoundException { - Class clz = Class.forName(className); - return (T) Instances.newInstance(clz); - } - - protected abstract String getHBase96Classname(); - protected abstract String getHBase98Classname(); - protected abstract String getHBase10Classname(); - protected abstract String getHBase10CDHClassname(); - protected abstract String getHBase11Classname(); -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java b/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java deleted file mode 100644 index cdc9fef..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import co.cask.tephra.TxConstants; -import co.cask.tephra.persist.TransactionVisibilityState; -import com.google.common.primitives.Longs; - -import java.util.Map; - -/** - * Utility methods supporting transaction operations. - */ -public class TxUtils { - - // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data, - // i.e. data written before table was converted into transactional table using Tephra. - // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does - // not produce any false positives or false negatives. - // - // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest - // that will be written is in year 2200. - // 01-Jan-2000 GMT is 946684800000 milliseconds since epoch. - // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch. - // - // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using - // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is - // non-transactional (<= 946684800000). - // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant. - // - // On the other end, let's say we enabled transactions on a table on 31-Dec-2200, - // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a - // cell timestamp is transactional (<= 7289654399000). - // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant. - private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1); - - /** - * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column - * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}. - * @param ttlByFamily A map of column family name to TTL value (in milliseconds) - * @param tx The current transaction - * @return The oldest timestamp that will be visible for the given transaction and TTL configuration - */ - public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) { - long maxTTL = getMaxTTL(ttlByFamily); - // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it - return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0; - } - - /** - * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column - * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}. - * @param ttlByFamily A map of column family name to TTL value (in milliseconds) - * @param tx The current transaction - * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data - * @return The oldest timestamp that will be visible for the given transaction and TTL configuration - */ - public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) { - if (readNonTxnData) { - long maxTTL = getMaxTTL(ttlByFamily); - return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0; - } - - return getOldestVisibleTimestamp(ttlByFamily, tx); - } - - /** - * Returns the maximum timestamp to use for time-range operations, based on the given transaction. - * @param tx The current transaction - * @return The maximum timestamp (exclusive) to use for time-range operations - */ - public static long getMaxVisibleTimestamp(Transaction tx) { - // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive - // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE - return tx.getWritePointer() < Long.MAX_VALUE ? - tx.getWritePointer() + 1 : tx.getWritePointer(); - } - - /** - * Creates a "dummy" transaction based on the given txVisibilityState's state. This is not a "real" transaction in - * the sense that it has not been started, data should not be written with it, and it cannot be committed. However, - * this can still be useful for filtering data according to the txVisibilityState's state. Instead of the actual - * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells - * as being written by this transaction (and therefore visible). - */ - public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) { - return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE, - Longs.toArray(txVisibilityState.getInvalid()), - Longs.toArray(txVisibilityState.getInProgress().keySet()), - TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT); - } - - /** - * Returns the write pointer for the first "short" transaction that in the in-progress set, or - * {@link Transaction#NO_TX_IN_PROGRESS} if none. - */ - public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) { - long firstShort = Transaction.NO_TX_IN_PROGRESS; - for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { - if (!entry.getValue().isLongRunning()) { - firstShort = entry.getKey(); - break; - } - } - return firstShort; - } - - /** - * Returns the timestamp for calculating time to live for the given cell timestamp. - * This takes into account pre-existing non-transactional cells while calculating the time. - */ - public static long getTimestampForTTL(long cellTs) { - return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs; - } - - /** - * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set. - */ - private static long getMaxTTL(Map<byte[], Long> ttlByFamily) { - long maxTTL = 0; - for (Long familyTTL : ttlByFamily.values()) { - maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL); - } - return maxTTL == 0 ? Long.MAX_VALUE : maxTTL; - } - - /** - * Returns true if version was written before table was converted into transactional table, false otherwise. - */ - public static boolean isPreExistingVersion(long version) { - return version < MAX_NON_TX_TIMESTAMP; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java deleted file mode 100644 index 3d5b25b..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import com.google.common.base.Stopwatch; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Default implementation of {@link FenceWait}. - */ -public class DefaultFenceWait implements FenceWait { - private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class); - - private final TransactionContext txContext; - - DefaultFenceWait(TransactionContext txContext) { - this.txContext = txContext; - } - - @Override - public void await(long timeout, TimeUnit timeUnit) - throws TransactionFailureException, InterruptedException, TimeoutException { - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - long sleepTimeMicros = timeUnit.toMicros(timeout) / 10; - // Have sleep time to be within 1 microsecond and 500 milliseconds - sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1); - while (stopwatch.elapsedTime(timeUnit) < timeout) { - txContext.start(); - try { - txContext.finish(); - return; - } catch (TransactionFailureException e) { - LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e); - txContext.abort(); - TimeUnit.MICROSECONDS.sleep(sleepTimeMicros); - } - } - throw new TimeoutException("Timeout waiting for fence"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java deleted file mode 100644 index a5370b2..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.TransactionFailureException; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. - */ -public interface FenceWait { - /** - * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried - * several times until the timeout. - * <p> - * - * If a fence wait times out then it means there are still some readers with in-progress transactions that have not - * seen the change. In this case the wait will have to be retried using the same FenceWait object. - * - * @param timeout Maximum time to wait - * @param timeUnit {@link TimeUnit} for timeout and sleepTime - * @throws TransactionFailureException when not able to start fence wait transaction - * @throws InterruptedException on any interrupt - * @throws TimeoutException when timeout is reached - */ - void await(long timeout, TimeUnit timeUnit) - throws TransactionFailureException, InterruptedException, TimeoutException; -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java deleted file mode 100644 index f7b8850..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionAware; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.Longs; - -import java.util.Collection; -import java.util.Collections; - -/** - * Implementation of {@link VisibilityFence} used by reader. - */ -class ReadFence implements TransactionAware { - private final byte[] fenceId; - private Transaction tx; - - public ReadFence(byte[] fenceId) { - this.fenceId = fenceId; - } - - @Override - public void startTx(Transaction tx) { - this.tx = tx; - } - - @Override - public void updateTx(Transaction tx) { - // Fences only need original transaction - } - - @Override - public Collection<byte[]> getTxChanges() { - if (tx == null) { - throw new IllegalStateException("Transaction has not started yet"); - } - return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId()))); - } - - @Override - public boolean commitTx() throws Exception { - // Nothing to persist - return true; - } - - @Override - public void postTxCommit() { - tx = null; - } - - @Override - public boolean rollbackTx() throws Exception { - // Nothing to rollback - return true; - } - - @Override - public String getTransactionAwareName() { - return getClass().getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java deleted file mode 100644 index beb92ed..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TransactionContext; -import co.cask.tephra.TransactionFailureException; -import co.cask.tephra.TransactionSystemClient; - -import java.util.concurrent.TimeoutException; - -/** - * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change - * that got committed. - * <p> - * - * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its - * transaction started. However to ensure that after a given point all readers are aware of a change, - * we have to introduce a conflict between a reader and a writer that act on the same data concurrently. - *<p> - * - * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence - * in its transaction. If there are no changes to the data when reader tries to commit the transaction - * containing the fence, the commit succeeds. - * <p> - * - * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed, - * the writer then waits on the fence to ensure that all in-progress readers are aware of this update. - * When the wait on the fence returns successfully, it means that - * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will - * force the readers to start a new transaction, and this ensures that the changes made by writer are visible - * to the readers. - *<p> - * - * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry - * until the given timeout. - * <p> - * - * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after - * this point onwards would see the change. - * - * <p> - * Sample reader code: - * <pre> - * <code> - * TransactionAware fence = VisibilityFence.create(fenceId); - * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...); - * readTxContext.start(); - * - * // do operations using table1, table2, etc. - * - * // finally commit - * try { - * readTxContext.finish(); - * } catch (TransactionConflictException e) { - * // handle conflict by aborting and starting over with a new transaction - * } - * </code> - * </pre> - *<p> - * - * Sample writer code: - * <pre> - * <code> - * // start transaction - * // write change - * // commit transaction - * - * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are - * aware of this change - * try { - * FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient); - * fenceWait.await(50000, 50, TimeUnit.MILLISECONDS); - * } catch (TimeoutException e) { - * // await timed out, the change may not be visible to all in-progress readers. - * // Application has two options at this point: - * // 1. Revert the write. Re-try the write and fence wait again. - * // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here). - * } - * </code> - * </pre> - * - * fenceId in the above samples refers to any id that both the readers and writer know for a given - * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data. - * Typically fenceId uniquely identifies the data in question. - * For example, if the data is a table row, the fenceId can be composed of table name and row key. - * If the data is a table cell, the fenceId can be composed of table name, row key, and column key. - *<p> - * - * Note that in this implementation, any reader that starts a transaction after the write is committed, and - * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then - * this reader will get a conflict while committing the fence even though this reader has seen the latest changes. - * This is because today there is no way to determine the commit time of a transaction. - */ -public final class VisibilityFence { - private VisibilityFence() { - // Cannot instantiate this class, all functionality is through static methods. - } - - /** - * Used by a reader to get a fence that can be added to its transaction context. - * - * @param fenceId uniquely identifies the data that this fence is used to synchronize. - * If the data is a table cell then this id can be composed of the table name, row key - * and column key for the data. - * @return {@link TransactionAware} to be added to reader's transaction context. - */ - public static TransactionAware create(byte[] fenceId) { - return new ReadFence(fenceId); - } - - /** - * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions. - * - * @param fenceId uniquely identifies the data that this fence is used to synchronize. - * If the data is a table cell then this id can be composed of the table name, row key - * and column key for the data. - * @return {@link FenceWait} object - */ - public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient) - throws TransactionFailureException, InterruptedException, TimeoutException { - return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java deleted file mode 100644 index a16264d..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.visibility; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionAware; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.Longs; -import com.google.common.primitives.UnsignedBytes; - -import java.util.Collection; -import java.util.TreeSet; - -/** - * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}. - */ -class WriteFence implements TransactionAware { - private final byte[] fenceId; - private Transaction tx; - private Collection<byte[]> inProgressChanges; - - public WriteFence(byte[] fenceId) { - this.fenceId = fenceId; - } - - @Override - public void startTx(Transaction tx) { - this.tx = tx; - if (inProgressChanges == null) { - inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator()); - for (long inProgressTx : tx.getInProgress()) { - inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx))); - } - } - } - - @Override - public void updateTx(Transaction tx) { - // Fences only need original transaction - } - - @Override - public Collection<byte[]> getTxChanges() { - if (inProgressChanges == null || tx == null) { - throw new IllegalStateException("Transaction has not started yet"); - } - return inProgressChanges; - } - - @Override - public boolean commitTx() throws Exception { - // Nothing to persist - return true; - } - - @Override - public void postTxCommit() { - tx = null; - } - - @Override - public boolean rollbackTx() throws Exception { - // Nothing to rollback - return true; - } - - @Override - public String getTransactionAwareName() { - return getClass().getSimpleName(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java deleted file mode 100644 index ceaffad..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.zookeeper; - -import org.apache.twill.zookeeper.ACLData; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; - -import java.util.List; - -/** - * A straightforward implementation of {@link ACLData}. - */ -final class BasicACLData implements ACLData { - - private final List<ACL> acl; - private final Stat stat; - - BasicACLData(List<ACL> acl, Stat stat) { - this.acl = acl; - this.stat = stat; - } - - @Override - public List<ACL> getACL() { - return acl; - } - - @Override - public Stat getStat() { - return stat; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java deleted file mode 100644 index ce81ade..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.zookeeper; - -import com.google.common.base.Objects; -import org.apache.twill.zookeeper.NodeChildren; -import org.apache.zookeeper.data.Stat; - -import java.util.List; - -/** - * Implementation of the {@link NodeChildren}. - */ -final class BasicNodeChildren implements NodeChildren { - - private final Stat stat; - private final List<String> children; - - BasicNodeChildren(List<String> children, Stat stat) { - this.stat = stat; - this.children = children; - } - - @Override - public Stat getStat() { - return stat; - } - - @Override - public List<String> getChildren() { - return children; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || !(o instanceof NodeChildren)) { - return false; - } - - NodeChildren that = (NodeChildren) o; - return stat.equals(that.getStat()) && children.equals(that.getChildren()); - } - - @Override - public int hashCode() { - return Objects.hashCode(children, stat); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java deleted file mode 100644 index b09ff4b..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.zookeeper; - -import com.google.common.base.Objects; -import org.apache.twill.zookeeper.NodeData; -import org.apache.zookeeper.data.Stat; - -import java.util.Arrays; - -/** - * A straightforward implementation for {@link NodeData}. - */ -final class BasicNodeData implements NodeData { - - private final byte[] data; - private final Stat stat; - - BasicNodeData(byte[] data, Stat stat) { - this.data = data; - this.stat = stat; - } - - @Override - public Stat getStat() { - return stat; - } - - @Override - public byte[] getData() { - return data; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || !(o instanceof NodeData)) { - return false; - } - - BasicNodeData that = (BasicNodeData) o; - - return stat.equals(that.getStat()) && Arrays.equals(data, that.getData()); - } - - @Override - public int hashCode() { - return Objects.hashCode(data, stat); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java deleted file mode 100644 index 5f6f565..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.zookeeper; - -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import org.apache.twill.common.Cancellable; -import org.apache.twill.common.Threads; -import org.apache.twill.internal.zookeeper.SettableOperationFuture; -import org.apache.twill.zookeeper.ACLData; -import org.apache.twill.zookeeper.NodeChildren; -import org.apache.twill.zookeeper.NodeData; -import org.apache.twill.zookeeper.OperationFuture; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.zookeeper.AsyncCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; - -/** - * The implementation of {@link ZKClientService}. - */ -public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher { - - private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class); - - private final String zkStr; - private final int sessionTimeout; - private final List<Watcher> connectionWatchers; - private final Multimap<String, byte[]> authInfos; - private final AtomicReference<ZooKeeper> zooKeeper; - private final Runnable stopTask; - private ExecutorService eventExecutor; - - /** - * Create a new instance. - * @param zkStr zookeper connection string - * @param sessionTimeout timeout in milliseconds - * @param connectionWatcher watcher to set - * @param authInfos authorization bytes - */ - public TephraZKClientService(String zkStr, int sessionTimeout, - Watcher connectionWatcher, Multimap<String, byte[]> authInfos) { - this.zkStr = zkStr; - this.sessionTimeout = sessionTimeout; - this.connectionWatchers = new CopyOnWriteArrayList<>(); - this.authInfos = copyAuthInfo(authInfos); - addConnectionWatcher(connectionWatcher); - - this.zooKeeper = new AtomicReference<>(); - this.stopTask = createStopTask(); - } - - @Override - public Long getSessionId() { - ZooKeeper zk = zooKeeper.get(); - return zk == null ? null : zk.getSessionId(); - } - - @Override - public String getConnectString() { - return zkStr; - } - - @Override - public Cancellable addConnectionWatcher(final Watcher watcher) { - if (watcher == null) { - return new Cancellable() { - @Override - public void cancel() { - // No-op - } - }; - } - - // Invocation of connection watchers are already done inside the event thread, - // hence no need to wrap the watcher again. - connectionWatchers.add(watcher); - return new Cancellable() { - @Override - public void cancel() { - connectionWatchers.remove(watcher); - } - }; - } - - @Override - public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) { - return create(path, data, createMode, true); - } - - @Override - public OperationFuture<String> create(String path, @Nullable byte[] data, - CreateMode createMode, boolean createParent) { - return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE); - } - - @Override - public OperationFuture<String> create(String path, @Nullable byte[] data, - CreateMode createMode, Iterable<ACL> acl) { - return create(path, data, createMode, true, acl); - } - - @Override - public OperationFuture<Stat> exists(String path) { - return exists(path, null); - } - - @Override - public OperationFuture<NodeChildren> getChildren(String path) { - return getChildren(path, null); - } - - @Override - public OperationFuture<NodeData> getData(String path) { - return getData(path, null); - } - - @Override - public OperationFuture<Stat> setData(String path, byte[] data) { - return setData(path, data, -1); - } - - @Override - public OperationFuture<String> delete(String path) { - return delete(path, -1); - } - - @Override - public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) { - return setACL(path, acl, -1); - } - - @Override - public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, - boolean createParent, Iterable<ACL> acl) { - return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false); - } - - private OperationFuture<String> doCreate(final String path, - @Nullable final byte[] data, - final CreateMode createMode, - final boolean createParent, - final List<ACL> acl, - final boolean ignoreNodeExists) { - final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture); - if (!createParent) { - return createFuture; - } - - // If create parent is request, return a different future - final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor); - // Watch for changes in the original future - Futures.addCallback(createFuture, new FutureCallback<String>() { - @Override - public void onSuccess(String path) { - // Propagate if creation was successful - result.set(path); - } - - @Override - public void onFailure(Throwable t) { - // See if the failure can be handled - if (updateFailureResult(t, result, path, ignoreNodeExists)) { - return; - } - // Create the parent node - String parentPath = getParent(path); - if (parentPath.isEmpty()) { - result.setException(t); - return; - } - // Watch for parent creation complete. Parent is created with the unsafe ACL. - Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT, - true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() { - @Override - public void onSuccess(String parentPath) { - // Create the requested path again - Futures.addCallback( - doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() { - @Override - public void onSuccess(String pathResult) { - result.set(pathResult); - } - - @Override - public void onFailure(Throwable t) { - // handle the failure - updateFailureResult(t, result, path, ignoreNodeExists); - } - }); - } - - @Override - public void onFailure(Throwable t) { - result.setException(t); - } - }); - } - - /** - * Updates the result future based on the given {@link Throwable}. - * @param t Cause of the failure - * @param result Future to be updated - * @param path Request path for the operation - * @return {@code true} if it is a failure, {@code false} otherwise. - */ - private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result, - String path, boolean ignoreNodeExists) { - // Propagate if there is error - if (!(t instanceof KeeperException)) { - result.setException(t); - return true; - } - KeeperException.Code code = ((KeeperException) t).code(); - // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation). - if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) { - // The requested path could be used because it only applies to non-sequential node - result.set(path); - return false; - } - if (code != KeeperException.Code.NONODE) { - result.setException(t); - return true; - } - return false; - } - - /** - * Gets the parent of the given path. - * @param path Path for computing its parent - * @return Parent of the given path, or empty string if the given path is the root path already. - */ - private String getParent(String path) { - String parentPath = path.substring(0, path.lastIndexOf('/')); - return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath; - } - }); - - return result; - } - - @Override - public OperationFuture<Stat> exists(String path, Watcher watcher) { - SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result); - return result; - } - - @Override - public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) { - SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result); - return result; - } - - @Override - public OperationFuture<NodeData> getData(String path, Watcher watcher) { - SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result); - - return result; - } - - @Override - public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) { - SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor); - getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result); - return result; - } - - @Override - public OperationFuture<String> delete(String deletePath, int version) { - SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor); - getZooKeeper().delete(deletePath, version, Callbacks.VOID, result); - return result; - } - - @Override - public OperationFuture<ACLData> getACL(String path) { - SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result); - return result; - } - - @Override - public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) { - SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor); - getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result); - return result; - } - - @Override - public Supplier<ZooKeeper> getZooKeeperSupplier() { - return new Supplier<ZooKeeper>() { - @Override - public ZooKeeper get() { - return getZooKeeper(); - } - }; - } - - @Override - protected void doStart() { - // A single thread executor for all events - ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(), - Threads.createDaemonThreadFactory("zk-client-EventThread")); - // Just discard the execution if the executor is closed - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); - eventExecutor = executor; - - try { - zooKeeper.set(createZooKeeper()); - } catch (IOException e) { - notifyFailed(e); - } - } - - @Override - protected void doStop() { - // Submit a task to the executor to make sure all pending events in the executor are fired before - // transiting this Service into STOPPED state - eventExecutor.submit(stopTask); - eventExecutor.shutdown(); - } - - /** - * @return Current {@link ZooKeeper} client. - */ - private ZooKeeper getZooKeeper() { - ZooKeeper zk = zooKeeper.get(); - Preconditions.checkArgument(zk != null, "Not connected to zooKeeper."); - return zk; - } - - /** - * Wraps the given watcher to be called from the event executor. - * @param watcher Watcher to be wrapped - * @return The wrapped Watcher - */ - private Watcher wrapWatcher(final Watcher watcher) { - if (watcher == null) { - return null; - } - return new Watcher() { - @Override - public void process(final WatchedEvent event) { - if (eventExecutor.isShutdown()) { - LOG.debug("Already shutdown. Discarding event: {}", event); - return; - } - eventExecutor.execute(new Runnable() { - @Override - public void run() { - try { - watcher.process(event); - } catch (Throwable t) { - LOG.error("Watcher throws exception.", t); - } - } - }); - } - }; - } - - /** - * Creates a deep copy of the given authInfos multimap. - */ - private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) { - Multimap<String, byte[]> result = ArrayListMultimap.create(); - - for (Map.Entry<String, byte[]> entry : authInfos.entries()) { - byte[] info = entry.getValue(); - result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length)); - } - - return result; - } - - @Override - public void process(WatchedEvent event) { - State state = state(); - if (state == State.TERMINATED || state == State.FAILED) { - return; - } - - try { - if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) { - LOG.debug("Connected to ZooKeeper: {}", zkStr); - notifyStarted(); - return; - } - if (event.getState() == Event.KeeperState.Expired) { - LOG.info("ZooKeeper session expired: {}", zkStr); - - // When connection expired, simply reconnect again - if (state != State.RUNNING) { - return; - } - eventExecutor.submit(new Runnable() { - @Override - public void run() { - // Only reconnect if the current state is running - if (state() != State.RUNNING) { - return; - } - try { - LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr); - closeZooKeeper(zooKeeper.getAndSet(createZooKeeper())); - } catch (IOException e) { - notifyFailed(e); - } - } - }); - } - } finally { - if (event.getType() == Event.EventType.None) { - for (Watcher connectionWatcher : connectionWatchers) { - connectionWatcher.process(event); - } - } - } - } - - /** - * Creates a new ZooKeeper connection. - */ - private ZooKeeper createZooKeeper() throws IOException { - ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this)); - for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) { - zk.addAuthInfo(authInfo.getKey(), authInfo.getValue()); - } - return zk; - } - - /** - * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException, - * it will get logged. - */ - private void closeZooKeeper(@Nullable ZooKeeper zk) { - try { - if (zk != null) { - zk.close(); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted when closing ZooKeeper", e); - Thread.currentThread().interrupt(); - } - } - - /** - * Creates a {@link Runnable} task that will get executed in the event executor for transiting this - * Service into STOPPED state. - */ - private Runnable createStopTask() { - return new Runnable() { - @Override - public void run() { - try { - // Close the ZK connection in this task will make sure if there is ZK connection created - // after doStop() was called but before this task has been executed is also closed. - // It is possible to happen when the following sequence happens: - // - // 1. session expired, hence the expired event is triggered - // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client - // 3. Service.stop() gets called, Service.state() changed to STOPPING - // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one - closeZooKeeper(zooKeeper.getAndSet(null)); - notifyStopped(); - } catch (Exception e) { - notifyFailed(e); - } - } - }; - } - - /** - * Collection of generic callbacks that simply reflect results into OperationFuture. - */ - private static final class Callbacks { - static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, String name) { - SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set((name == null || name.isEmpty()) ? path : name); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, Stat stat) { - SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set(stat); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - /** - * A stat callback that treats NONODE as success. - */ - static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, Stat stat) { - SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) { - result.set(stat); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { - SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set(new BasicNodeChildren(children, stat)); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set(new BasicNodeData(data, stat)); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx) { - SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set(result.getRequestPath()); - return; - } - // Otherwise, it is an error - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - - static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() { - @Override - @SuppressWarnings("unchecked") - public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) { - SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx; - KeeperException.Code code = KeeperException.Code.get(rc); - if (code == KeeperException.Code.OK) { - result.set(new BasicACLData(acl, stat)); - return; - } - result.setException(KeeperException.create(code, result.getRequestPath())); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java new file mode 100644 index 0000000..64fd7ed --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.base.Charsets; +import com.google.common.base.Objects; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.UnsignedBytes; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable} + * implementations. + */ +public abstract class AbstractTransactionAwareTable implements TransactionAware { + protected final TransactionCodec txCodec; + // map of write pointers to change set associated with each + protected final Map<Long, Set<ActionChange>> changeSets; + protected final TxConstants.ConflictDetection conflictLevel; + protected Transaction tx; + protected boolean allowNonTransactional; + + public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) { + this.conflictLevel = conflictLevel; + this.allowNonTransactional = allowNonTransactional; + this.txCodec = new TransactionCodec(); + this.changeSets = Maps.newHashMap(); + } + + /** + * True if the instance allows non-transaction operations. + * @return + */ + public boolean getAllowNonTransactional() { + return this.allowNonTransactional; + } + + /** + * Set whether the instance allows non-transactional operations. + * @param allowNonTransactional + */ + public void setAllowNonTransactional(boolean allowNonTransactional) { + this.allowNonTransactional = allowNonTransactional; + } + + @Override + public void startTx(Transaction tx) { + this.tx = tx; + } + + @Override + public void updateTx(Transaction tx) { + this.tx = tx; + } + + @Override + public Collection<byte[]> getTxChanges() { + if (conflictLevel == TxConstants.ConflictDetection.NONE) { + return Collections.emptyList(); + } + + Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator()); + for (Set<ActionChange> changeSet : changeSets.values()) { + for (ActionChange change : changeSet) { + txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier())); + } + } + return txChanges; + } + + public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) { + byte[] key; + switch (conflictLevel) { + case ROW: + key = Bytes.concat(getTableKey(), row); + break; + case COLUMN: + key = Bytes.concat(getTableKey(), row, family, qualifier); + break; + case NONE: + throw new IllegalStateException("NONE conflict detection does not support change keys"); + default: + throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); + } + return key; + } + + @Override + public boolean commitTx() throws Exception { + return doCommit(); + } + + /** + * Commits any pending writes by flushing the wrapped {@code HTable} instance. + */ + protected abstract boolean doCommit() throws IOException; + + @Override + public void postTxCommit() { + tx = null; + changeSets.clear(); + } + + @Override + public String getTransactionAwareName() { + return new String(getTableKey(), Charsets.UTF_8); + } + + /** + * Returns the table name to use as a key prefix for the transaction change set. + */ + protected abstract byte[] getTableKey(); + + @Override + public boolean rollbackTx() throws Exception { + return doRollback(); + } + + /** + * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the + * wrapped {@code HTable} instance. How this is handled will depend on the delete API exposed + * by the specific version of HBase. + */ + protected abstract boolean doRollback() throws Exception; + + protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) { + long currentWritePointer = tx.getWritePointer(); + Set<ActionChange> changeSet = changeSets.get(currentWritePointer); + if (changeSet == null) { + changeSet = Sets.newHashSet(); + changeSets.put(currentWritePointer, changeSet); + } + switch (conflictLevel) { + case ROW: + case NONE: + // with ROW or NONE conflict detection, we still need to track changes per-family, since this + // is the granularity at which we will issue deletes for rollback + changeSet.add(new ActionChange(row, family)); + break; + case COLUMN: + changeSet.add(new ActionChange(row, family, qualifier)); + break; + default: + throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); + } + } + + /** + * Record of each transaction that causes a change. This reference is used to rollback + * any operation upon failure. + */ + protected class ActionChange { + private final byte[] row; + private final byte[] family; + private final byte[] qualifier; + + public ActionChange(byte[] row, byte[] family) { + this(row, family, null); + } + + public ActionChange(byte[] row, byte[] family, byte[] qualifier) { + this.row = row; + this.family = family; + this.qualifier = qualifier; + } + + public byte[] getRow() { + return row; + } + + public byte[] getFamily() { + return family; + } + + public byte[] getQualifier() { + return qualifier; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + + if (o == this) { + return true; + } + + ActionChange other = (ActionChange) o; + return Objects.equal(this.row, other.row) && + Objects.equal(this.family, other.family) && + Objects.equal(this.qualifier, other.qualifier); + } + + @Override + public int hashCode() { + int result = Arrays.hashCode(row); + result = 31 * result + (family != null ? Arrays.hashCode(family) : 0); + result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java new file mode 100644 index 0000000..528085f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +/** + * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution + * to respective synchronous methods via provided {@link ExecutorService}. + */ +public abstract class AbstractTransactionExecutor implements TransactionExecutor { + private final ListeningExecutorService executorService; + + protected AbstractTransactionExecutor(ExecutorService executorService) { + this.executorService = MoreExecutors.listeningDecorator(executorService); + } + + @Override + public <I, O> O executeUnchecked(Function<I, O> function, I input) { + try { + return execute(function, input); + } catch (TransactionFailureException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public <I> void executeUnchecked(Procedure<I> procedure, I input) { + try { + execute(procedure, input); + } catch (TransactionFailureException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public <O> O executeUnchecked(Callable<O> callable) { + try { + return execute(callable); + } catch (TransactionFailureException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public void executeUnchecked(Subroutine subroutine) { + try { + execute(subroutine); + } catch (TransactionFailureException e) { + throw Throwables.propagate(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + + @Override + public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) { + return executorService.submit(new Callable<O>() { + @Override + public O call() throws Exception { + return execute(function, input); + } + }); + } + + @Override + public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) { + return executorService.submit(new Callable<Object>() { + @Override + public I call() throws Exception { + execute(procedure, input); + return null; + } + }); + } + + @Override + public <O> ListenableFuture<O> submit(final Callable<O> callable) { + return executorService.submit(new Callable<O>() { + @Override + public O call() throws Exception { + return execute(callable); + } + }); + } + + @Override + public ListenableFuture<?> submit(final Subroutine subroutine) { + return executorService.submit(new Callable<Object>() { + @Override + public Object call() throws Exception { + execute(subroutine); + return null; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/ChangeId.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/ChangeId.java b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java new file mode 100644 index 0000000..0eb7191 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import java.util.Arrays; + +/** + * Represents a row key from a data set changed as part of a transaction. + */ +public final class ChangeId { + private final byte[] key; + private final int hash; + + public ChangeId(byte[] bytes) { + key = bytes; + hash = Arrays.hashCode(bytes); + } + + public byte[] getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null || o.getClass() != ChangeId.class) { + return false; + } + ChangeId other = (ChangeId) o; + return hash == other.hash && Arrays.equals(key, other.key); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + return toStringBinary(key, 0, key.length); + } + + // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes. + private String toStringBinary(byte [] b, int off, int len) { + StringBuilder result = new StringBuilder(); + for (int i = off; i < off + len; ++i) { + int ch = b[i] & 0xFF; + if ((ch >= '0' && ch <= '9') + || (ch >= 'A' && ch <= 'Z') + || (ch >= 'a' && ch <= 'z') + || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) { + result.append((char) ch); + } else { + result.append(String.format("\\x%02X", ch)); + } + } + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java new file mode 100644 index 0000000..c5e1cb3 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Utility class that encapsulates the transaction life cycle over a given set of + * transaction-aware datasets. The executor can be reused across multiple invocations + * of the execute() method. However, it is not thread-safe for concurrent execution. + * <p> + * Transaction execution will be retries according to specified in constructor {@link RetryStrategy}. + * By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries. + * </p> + */ +public class DefaultTransactionExecutor extends AbstractTransactionExecutor { + + private final Collection<TransactionAware> txAwares; + private final TransactionSystemClient txClient; + private final RetryStrategy retryStrategy; + + /** + * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)} + */ + public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) { + this(txClient, Arrays.asList(txAwares)); + } + + + public DefaultTransactionExecutor(TransactionSystemClient txClient, + Iterable<TransactionAware> txAwares, + RetryStrategy retryStrategy) { + + super(MoreExecutors.sameThreadExecutor()); + this.txAwares = ImmutableList.copyOf(txAwares); + this.txClient = txClient; + this.retryStrategy = retryStrategy; + } + + /** + * Constructor for a transaction executor. + */ + @Inject + public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) { + this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100)); + } + + @Override + public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException { + return executeWithRetry(function, input); + } + + @Override + public <I> void execute(final Procedure<I> procedure, I input) + throws TransactionFailureException, InterruptedException { + + execute(new Function<I, Void>() { + @Override + public Void apply(I input) throws Exception { + procedure.apply(input); + return null; + } + }, input); + } + + @Override + public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException { + return execute(new Function<Void, O>() { + @Override + public O apply(Void input) throws Exception { + return callable.call(); + } + }, null); + } + + @Override + public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException { + execute(new Function<Void, Void>() { + @Override + public Void apply(Void input) throws Exception { + subroutine.apply(); + return null; + } + }, null); + } + + private <I, O> O executeWithRetry(Function<I, O> function, I input) + throws TransactionFailureException, InterruptedException { + + int retries = 0; + while (true) { + try { + return executeOnce(function, input); + } catch (TransactionFailureException e) { + long delay = retryStrategy.nextRetry(e, ++retries); + + if (delay < 0) { + throw e; + } + + if (delay > 0) { + TimeUnit.MILLISECONDS.sleep(delay); + } + } + } + + } + + private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException { + TransactionContext txContext = new TransactionContext(txClient, txAwares); + txContext.start(); + O o = null; + try { + o = function.apply(input); + } catch (Throwable e) { + txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e)); + // abort will throw + } + // will throw if smth goes wrong + txContext.finish(); + return o; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java new file mode 100644 index 0000000..1cdbfb0 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that + * were started before the given time. + */ +public class InvalidTruncateTimeException extends Exception { + public InvalidTruncateTimeException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java new file mode 100644 index 0000000..4dc245d --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Does no retries + */ +public class NoRetryStrategy implements RetryStrategy { + public static final RetryStrategy INSTANCE = new NoRetryStrategy(); + + private NoRetryStrategy() {} + + @Override + public long nextRetry(TransactionFailureException reason, int failureCount) { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java new file mode 100644 index 0000000..82f069f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Retries transaction execution when transaction fails with {@link TransactionConflictException}. + */ +public class RetryOnConflictStrategy implements RetryStrategy { + private final int maxRetries; + private final long retryDelay; + + public RetryOnConflictStrategy(int maxRetries, long retryDelay) { + this.maxRetries = maxRetries; + this.retryDelay = retryDelay; + } + + @Override + public long nextRetry(TransactionFailureException reason, int failureCount) { + if (reason instanceof TransactionConflictException) { + return failureCount > maxRetries ? -1 : retryDelay; + } else { + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java new file mode 100644 index 0000000..9550d77 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Collection of {@link RetryStrategy}s. + */ +public final class RetryStrategies { + private RetryStrategies() {} + + /** + * @param maxRetries max number of retries + * @param delayInMs delay between retries in milliseconds + * @return RetryStrategy that retries transaction execution when transaction fails with + * {@link TransactionConflictException} + */ + public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) { + return new RetryOnConflictStrategy(maxRetries, delayInMs); + } + + public static RetryStrategy noRetries() { + return NoRetryStrategy.INSTANCE; + } +}
