http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java deleted file mode 100644 index 11a5b55..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionAwareTable.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.base.Charsets; -import com.google.common.base.Objects; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.UnsignedBytes; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -/** - * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable} - * implementations. - */ -public abstract class AbstractTransactionAwareTable implements TransactionAware { - protected final TransactionCodec txCodec; - // map of write pointers to change set associated with each - protected final Map<Long, Set<ActionChange>> changeSets; - protected final TxConstants.ConflictDetection conflictLevel; - protected Transaction tx; - protected boolean allowNonTransactional; - - public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) { - this.conflictLevel = conflictLevel; - this.allowNonTransactional = allowNonTransactional; - this.txCodec = new TransactionCodec(); - this.changeSets = Maps.newHashMap(); - } - - /** - * True if the instance allows non-transaction operations. - * @return - */ - public boolean getAllowNonTransactional() { - return this.allowNonTransactional; - } - - /** - * Set whether the instance allows non-transactional operations. - * @param allowNonTransactional - */ - public void setAllowNonTransactional(boolean allowNonTransactional) { - this.allowNonTransactional = allowNonTransactional; - } - - @Override - public void startTx(Transaction tx) { - this.tx = tx; - } - - @Override - public void updateTx(Transaction tx) { - this.tx = tx; - } - - @Override - public Collection<byte[]> getTxChanges() { - if (conflictLevel == TxConstants.ConflictDetection.NONE) { - return Collections.emptyList(); - } - - Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator()); - for (Set<ActionChange> changeSet : changeSets.values()) { - for (ActionChange change : changeSet) { - txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier())); - } - } - return txChanges; - } - - public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) { - byte[] key; - switch (conflictLevel) { - case ROW: - key = Bytes.concat(getTableKey(), row); - break; - case COLUMN: - key = Bytes.concat(getTableKey(), row, family, qualifier); - break; - case NONE: - throw new IllegalStateException("NONE conflict detection does not support change keys"); - default: - throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); - } - return key; - } - - @Override - public boolean commitTx() throws Exception { - return doCommit(); - } - - /** - * Commits any pending writes by flushing the wrapped {@code HTable} instance. - */ - protected abstract boolean doCommit() throws IOException; - - @Override - public void postTxCommit() { - tx = null; - changeSets.clear(); - } - - @Override - public String getTransactionAwareName() { - return new String(getTableKey(), Charsets.UTF_8); - } - - /** - * Returns the table name to use as a key prefix for the transaction change set. - */ - protected abstract byte[] getTableKey(); - - @Override - public boolean rollbackTx() throws Exception { - return doRollback(); - } - - /** - * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the - * wrapped {@code HTable} instance. How this is handled will depend on the delete API exposed - * by the specific version of HBase. - */ - protected abstract boolean doRollback() throws Exception; - - protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) { - long currentWritePointer = tx.getWritePointer(); - Set<ActionChange> changeSet = changeSets.get(currentWritePointer); - if (changeSet == null) { - changeSet = Sets.newHashSet(); - changeSets.put(currentWritePointer, changeSet); - } - switch (conflictLevel) { - case ROW: - case NONE: - // with ROW or NONE conflict detection, we still need to track changes per-family, since this - // is the granularity at which we will issue deletes for rollback - changeSet.add(new ActionChange(row, family)); - break; - case COLUMN: - changeSet.add(new ActionChange(row, family, qualifier)); - break; - default: - throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); - } - } - - /** - * Record of each transaction that causes a change. This reference is used to rollback - * any operation upon failure. - */ - protected class ActionChange { - private final byte[] row; - private final byte[] family; - private final byte[] qualifier; - - public ActionChange(byte[] row, byte[] family) { - this(row, family, null); - } - - public ActionChange(byte[] row, byte[] family, byte[] qualifier) { - this.row = row; - this.family = family; - this.qualifier = qualifier; - } - - public byte[] getRow() { - return row; - } - - public byte[] getFamily() { - return family; - } - - public byte[] getQualifier() { - return qualifier; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != this.getClass()) { - return false; - } - - if (o == this) { - return true; - } - - ActionChange other = (ActionChange) o; - return Objects.equal(this.row, other.row) && - Objects.equal(this.family, other.family) && - Objects.equal(this.qualifier, other.qualifier); - } - - @Override - public int hashCode() { - int result = Arrays.hashCode(row); - result = 31 * result + (family != null ? Arrays.hashCode(family) : 0); - result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0); - return result; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java deleted file mode 100644 index f2a28a8..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/AbstractTransactionExecutor.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; - -/** - * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution - * to respective synchronous methods via provided {@link ExecutorService}. - */ -public abstract class AbstractTransactionExecutor implements TransactionExecutor { - private final ListeningExecutorService executorService; - - protected AbstractTransactionExecutor(ExecutorService executorService) { - this.executorService = MoreExecutors.listeningDecorator(executorService); - } - - @Override - public <I, O> O executeUnchecked(Function<I, O> function, I input) { - try { - return execute(function, input); - } catch (TransactionFailureException e) { - throw Throwables.propagate(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - } - - @Override - public <I> void executeUnchecked(Procedure<I> procedure, I input) { - try { - execute(procedure, input); - } catch (TransactionFailureException e) { - throw Throwables.propagate(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - } - - @Override - public <O> O executeUnchecked(Callable<O> callable) { - try { - return execute(callable); - } catch (TransactionFailureException e) { - throw Throwables.propagate(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - } - - @Override - public void executeUnchecked(Subroutine subroutine) { - try { - execute(subroutine); - } catch (TransactionFailureException e) { - throw Throwables.propagate(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw Throwables.propagate(e); - } - } - - @Override - public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) { - return executorService.submit(new Callable<O>() { - @Override - public O call() throws Exception { - return execute(function, input); - } - }); - } - - @Override - public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) { - return executorService.submit(new Callable<Object>() { - @Override - public I call() throws Exception { - execute(procedure, input); - return null; - } - }); - } - - @Override - public <O> ListenableFuture<O> submit(final Callable<O> callable) { - return executorService.submit(new Callable<O>() { - @Override - public O call() throws Exception { - return execute(callable); - } - }); - } - - @Override - public ListenableFuture<?> submit(final Subroutine subroutine) { - return executorService.submit(new Callable<Object>() { - @Override - public Object call() throws Exception { - execute(subroutine); - return null; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/ChangeId.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java b/tephra-core/src/main/java/co/cask/tephra/ChangeId.java deleted file mode 100644 index 8413256..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/ChangeId.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import java.util.Arrays; - -/** - * Represents a row key from a data set changed as part of a transaction. - */ -public final class ChangeId { - private final byte[] key; - private final int hash; - - public ChangeId(byte[] bytes) { - key = bytes; - hash = Arrays.hashCode(bytes); - } - - public byte[] getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o == null || o.getClass() != ChangeId.class) { - return false; - } - ChangeId other = (ChangeId) o; - return hash == other.hash && Arrays.equals(key, other.key); - } - - @Override - public int hashCode() { - return hash; - } - - @Override - public String toString() { - return toStringBinary(key, 0, key.length); - } - - // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes. - private String toStringBinary(byte [] b, int off, int len) { - StringBuilder result = new StringBuilder(); - for (int i = off; i < off + len; ++i) { - int ch = b[i] & 0xFF; - if ((ch >= '0' && ch <= '9') - || (ch >= 'A' && ch <= 'Z') - || (ch >= 'a' && ch <= 'z') - || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) { - result.append((char) ch); - } else { - result.append(String.format("\\x%02X", ch)); - } - } - return result.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java deleted file mode 100644 index f85dd6b..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/DefaultTransactionExecutor.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -/** - * Utility class that encapsulates the transaction life cycle over a given set of - * transaction-aware datasets. The executor can be reused across multiple invocations - * of the execute() method. However, it is not thread-safe for concurrent execution. - * <p> - * Transaction execution will be retries according to specified in constructor {@link RetryStrategy}. - * By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries. - * </p> - */ -public class DefaultTransactionExecutor extends AbstractTransactionExecutor { - - private final Collection<TransactionAware> txAwares; - private final TransactionSystemClient txClient; - private final RetryStrategy retryStrategy; - - /** - * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)} - */ - public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) { - this(txClient, Arrays.asList(txAwares)); - } - - - public DefaultTransactionExecutor(TransactionSystemClient txClient, - Iterable<TransactionAware> txAwares, - RetryStrategy retryStrategy) { - - super(MoreExecutors.sameThreadExecutor()); - this.txAwares = ImmutableList.copyOf(txAwares); - this.txClient = txClient; - this.retryStrategy = retryStrategy; - } - - /** - * Constructor for a transaction executor. - */ - @Inject - public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) { - this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100)); - } - - @Override - public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException { - return executeWithRetry(function, input); - } - - @Override - public <I> void execute(final Procedure<I> procedure, I input) - throws TransactionFailureException, InterruptedException { - - execute(new Function<I, Void>() { - @Override - public Void apply(I input) throws Exception { - procedure.apply(input); - return null; - } - }, input); - } - - @Override - public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException { - return execute(new Function<Void, O>() { - @Override - public O apply(Void input) throws Exception { - return callable.call(); - } - }, null); - } - - @Override - public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException { - execute(new Function<Void, Void>() { - @Override - public Void apply(Void input) throws Exception { - subroutine.apply(); - return null; - } - }, null); - } - - private <I, O> O executeWithRetry(Function<I, O> function, I input) - throws TransactionFailureException, InterruptedException { - - int retries = 0; - while (true) { - try { - return executeOnce(function, input); - } catch (TransactionFailureException e) { - long delay = retryStrategy.nextRetry(e, ++retries); - - if (delay < 0) { - throw e; - } - - if (delay > 0) { - TimeUnit.MILLISECONDS.sleep(delay); - } - } - } - - } - - private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException { - TransactionContext txContext = new TransactionContext(txClient, txAwares); - txContext.start(); - O o = null; - try { - o = function.apply(input); - } catch (Throwable e) { - txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e)); - // abort will throw - } - // will throw if smth goes wrong - txContext.finish(); - return o; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java deleted file mode 100644 index b4576ee..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/InvalidTruncateTimeException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that - * were started before the given time. - */ -public class InvalidTruncateTimeException extends Exception { - public InvalidTruncateTimeException(String s) { - super(s); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java deleted file mode 100644 index 30e9ceb..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/NoRetryStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Does no retries - */ -public class NoRetryStrategy implements RetryStrategy { - public static final RetryStrategy INSTANCE = new NoRetryStrategy(); - - private NoRetryStrategy() {} - - @Override - public long nextRetry(TransactionFailureException reason, int failureCount) { - return -1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java deleted file mode 100644 index 04b9f75..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/RetryOnConflictStrategy.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Retries transaction execution when transaction fails with {@link TransactionConflictException}. - */ -public class RetryOnConflictStrategy implements RetryStrategy { - private final int maxRetries; - private final long retryDelay; - - public RetryOnConflictStrategy(int maxRetries, long retryDelay) { - this.maxRetries = maxRetries; - this.retryDelay = retryDelay; - } - - @Override - public long nextRetry(TransactionFailureException reason, int failureCount) { - if (reason instanceof TransactionConflictException) { - return failureCount > maxRetries ? -1 : retryDelay; - } else { - return -1; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java deleted file mode 100644 index d1de0e5..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategies.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Collection of {@link RetryStrategy}s. - */ -public final class RetryStrategies { - private RetryStrategies() {} - - /** - * @param maxRetries max number of retries - * @param delayInMs delay between retries in milliseconds - * @return RetryStrategy that retries transaction execution when transaction fails with - * {@link TransactionConflictException} - */ - public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) { - return new RetryOnConflictStrategy(maxRetries, delayInMs); - } - - public static RetryStrategy noRetries() { - return NoRetryStrategy.INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java deleted file mode 100644 index e651d0e..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/RetryStrategy.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Retry strategy for failed transactions - */ -public interface RetryStrategy { - /** - * Returns the number of milliseconds to wait before retrying the operation. - * - * @param reason Reason for transaction failure. - * @param failureCount Number of times that the request has been failed. - * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means - * retry it immediately, while negative means abort the operation. - */ - long nextRetry(TransactionFailureException reason, int failureCount); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java b/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java deleted file mode 100644 index 25ccbfb..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionAdmin.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.runtime.ConfigModule; -import co.cask.tephra.runtime.DiscoveryModules; -import co.cask.tephra.runtime.TransactionClientModule; -import co.cask.tephra.runtime.TransactionModules; -import co.cask.tephra.runtime.ZKModule; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.inject.Guice; -import com.google.inject.Injector; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.zookeeper.ZKClientService; - -import java.io.PrintStream; -import java.util.List; -import java.util.Set; - -/** - * Allows calling some methods on {@link TransactionManager} from command line. - */ -public class TransactionAdmin { - private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx"; - private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before"; - private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size"; - - private final PrintStream out; - private final PrintStream err; - - public static void main(String[] args) { - TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err); - int status = txAdmin.doMain(args, new Configuration()); - System.exit(status); - } - - public TransactionAdmin(PrintStream out, PrintStream err) { - this.out = out; - this.err = err; - } - - @VisibleForTesting - int doMain(String[] args, Configuration conf) { - if (args.length < 1) { - printUsage(); - return 1; - } - - Injector injector = Guice.createInjector( - new ConfigModule(conf), - new ZKModule(), - new DiscoveryModules().getDistributedModules(), - new TransactionModules().getDistributedModules(), - new TransactionClientModule() - ); - - ZKClientService zkClient = injector.getInstance(ZKClientService.class); - zkClient.startAndWait(); - - try { - TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class); - String option = args[0]; - - if (option.equals(OPT_TRUNCATE_INVALID_TX)) { - if (args.length != 2) { - printUsage(); - return 1; - } - Set<Long> txIds; - try { - txIds = parseTxIds(args[1]); - } catch (NumberFormatException e) { - err.println("NumberFormatException: " + e.getMessage()); - return 1; - } - if (!txIds.isEmpty()) { - out.println("Invalid list size before truncation: " + txClient.getInvalidSize()); - txClient.truncateInvalidTx(txIds); - out.println("Invalid list size after truncation: " + txClient.getInvalidSize()); - } - } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) { - if (args.length != 2) { - printUsage(); - return 1; - } - try { - long time = Long.parseLong(args[1]); - out.println("Invalid list size before truncation: " + txClient.getInvalidSize()); - txClient.truncateInvalidTxBefore(time); - out.println("Invalid list size after truncation: " + txClient.getInvalidSize()); - } catch (InvalidTruncateTimeException e) { - err.println(e.getMessage()); - return 1; - } catch (NumberFormatException e) { - err.println("NumberFormatException: " + e.getMessage()); - return 1; - } - } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) { - if (args.length != 1) { - printUsage(); - return 1; - } - out.println("Invalid list size: " + txClient.getInvalidSize()); - } else { - printUsage(); - return 1; - } - } finally { - zkClient.stopAndWait(); - } - return 0; - } - - private Set<Long> parseTxIds(String option) throws NumberFormatException { - Set<Long> txIds = Sets.newHashSet(); - for (String str : Splitter.on(',').split(option)) { - txIds.add(Long.parseLong(str)); - } - return txIds; - } - - private void printUsage() { - String programName = TransactionAdmin.class.getSimpleName(); - String spaces = " "; - List<String> options = Lists.newArrayList(); - options.add(join("Usage: ")); - options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>")); - options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>")); - options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE)); - - String usage = Joiner.on(System.getProperty("line.separator")).join(options); - err.println(usage); - } - - private static String join(String... args) { - return Joiner.on(" ").join(args); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java deleted file mode 100644 index 403b1ec..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionCodec.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import co.cask.tephra.distributed.TransactionConverterUtils; -import co.cask.tephra.distributed.thrift.TTransaction; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; - -import java.io.IOException; - -/** - * Handles serialization and deserialization of {@link co.cask.tephra.Transaction} instances to and from {@code byte[]}. - */ -public class TransactionCodec { - - public TransactionCodec() { - } - - public byte[] encode(Transaction tx) throws IOException { - TTransaction thriftTx = TransactionConverterUtils.wrap(tx); - TSerializer serializer = new TSerializer(); - try { - return serializer.serialize(thriftTx); - } catch (TException te) { - throw new IOException(te); - } - } - - public Transaction decode(byte[] encoded) throws IOException { - TTransaction thriftTx = new TTransaction(); - TDeserializer deserializer = new TDeserializer(); - try { - deserializer.deserialize(thriftTx, encoded); - return TransactionConverterUtils.unwrap(thriftTx); - } catch (TException te) { - throw new IOException(te); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java b/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java deleted file mode 100644 index 5b31ebe..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionContext.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import javax.annotation.Nullable; - -/** - * Utility class that encapsulates the transaction life cycle over a given set of - * transaction-aware datasets. It is not thread-safe for concurrent execution. - */ -public class TransactionContext { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); - - private final Collection<TransactionAware> txAwares; - private final TransactionSystemClient txClient; - - private Transaction currentTx; - - public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) { - this(txClient, ImmutableList.copyOf(txAwares)); - } - - public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) { - // Use a set to avoid adding the same TransactionAware twice and to make removal faster. - // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List). - this.txAwares = Sets.newLinkedHashSet(txAwares); - this.txClient = txClient; - } - - /** - * Adds a new transaction-aware to participate in the transaction. - * @param txAware the new transaction-aware - */ - public boolean addTransactionAware(TransactionAware txAware) { - // If the txAware is newly added, call startTx as well if there is an active transaction - boolean added = txAwares.add(txAware); - if (added && currentTx != null) { - txAware.startTx(currentTx); - } - return added; - } - - /** - * Removes a {@link TransactionAware} and withdraws from participation in the transaction. - * Withdrawal is only allowed if there is no active transaction. - * - * @param txAware the {@link TransactionAware} to be removed - * @return true if the given {@link TransactionAware} is removed; false otherwise. - * @throws IllegalStateException if there is an active transaction going on with this TransactionContext. - */ - public boolean removeTransactionAware(TransactionAware txAware) { - Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction."); - return txAwares.remove(txAware); - } - - /** - * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient}, - * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered - * TransactionAware. If an exception is encountered, the transaction will be aborted and a - * {@code TransactionFailureException} wrapping the root cause will be thrown. - * - * @throws TransactionFailureException if an exception occurs starting the transaction with any registered - * TransactionAware - */ - public void start() throws TransactionFailureException { - currentTx = txClient.startShort(); - for (TransactionAware txAware : txAwares) { - try { - txAware.startTx(currentTx); - } catch (Throwable e) { - String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ", - txAware.getTransactionAwareName(), currentTx.getTransactionId()); - LOG.warn(message, e); - txClient.abort(currentTx); - throw new TransactionFailureException(message, e); - } - } - } - - /** - * Commits the current transaction. This will: check for any conflicts, based on the change set aggregated from - * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s; - * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state. - * - * @throws TransactionConflictException if a conflict is detected with a recently committed transaction - * @throws TransactionFailureException if an error occurs while committing - */ - public void finish() throws TransactionFailureException { - Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started"); - // each of these steps will abort and rollback the tx in case if errors, and throw an exception - checkForConflicts(); - persist(); - commit(); - postCommit(); - currentTx = null; - } - - /** - * Aborts the given transaction, and rolls back all data set changes. If rollback fails, - * the transaction is invalidated. If an exception is caught during rollback, the exception - * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have - * completed rollback. - * - * @throws TransactionFailureException for any exception that is encountered. - */ - public void abort() throws TransactionFailureException { - abort(null); - } - - /** - * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware} - * instances, and obtaining a new current write pointer for the transaction. By performing a checkpoint, - * the client can ensure that all previous writes were flushed and are visible. By default, the current write - * pointer for the transaction is also visible. The current write pointer can be excluded from read - * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set - * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created - * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}. - * - * After the checkpoint operation is performed, the updated - * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each - * registered {@code TransactionAware} instance. - * - * @throws TransactionFailureException if an error occurs while performing the checkpoint - */ - public void checkpoint() throws TransactionFailureException { - Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started"); - persist(); - try { - currentTx = txClient.checkpoint(currentTx); - // update the current transaction with all TransactionAwares - for (TransactionAware txAware : txAwares) { - txAware.updateTx(currentTx); - } - } catch (TransactionNotInProgressException e) { - String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } catch (Throwable e) { - String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } - } - - /** - * Returns the current transaction or null if no transaction is currently in progress. - */ - @Nullable - public Transaction getCurrentTransaction() { - return currentTx; - } - - // CHECKSTYLE IGNORE "@throws" FOR 11 LINES - /** - * Aborts the given transaction, and rolls back all data set changes. If rollback fails, - * the transaction is invalidated. If an exception is caught during rollback, the exception - * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have - * completed rollback. If an existing exception is passed in, that exception is thrown in either - * case, whether the rollback is successful or not. In other words, this method always throws the - * first exception that it encounters. - * @param cause the original exception that caused the abort - * @throws TransactionFailureException for any exception that is encountered. - */ - public void abort(TransactionFailureException cause) throws TransactionFailureException { - if (currentTx == null) { - // might be called by some generic exception handler even though already aborted/finished - we allow that - return; - } - try { - boolean success = true; - for (TransactionAware txAware : txAwares) { - try { - if (!txAware.rollbackTx()) { - success = false; - } - } catch (Throwable e) { - String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ", - txAware.getTransactionAwareName(), currentTx.getTransactionId()); - LOG.warn(message, e); - if (cause == null) { - cause = new TransactionFailureException(message, e); - } - success = false; - } - } - if (success) { - txClient.abort(currentTx); - } else { - txClient.invalidate(currentTx.getTransactionId()); - } - if (cause != null) { - throw cause; - } - } finally { - currentTx = null; - } - } - - private void checkForConflicts() throws TransactionFailureException { - Collection<byte[]> changes = Lists.newArrayList(); - for (TransactionAware txAware : txAwares) { - try { - changes.addAll(txAware.getTxChanges()); - } catch (Throwable e) { - String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ", - txAware.getTransactionAwareName(), currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } - } - - boolean canCommit = false; - try { - canCommit = txClient.canCommit(currentTx, changes); - } catch (TransactionNotInProgressException e) { - String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } catch (Throwable e) { - String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } - if (!canCommit) { - String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); - abort(new TransactionConflictException(message)); - // abort will throw - } - } - - private void persist() throws TransactionFailureException { - for (TransactionAware txAware : txAwares) { - boolean success; - Throwable cause = null; - try { - success = txAware.commitTx(); - } catch (Throwable e) { - success = false; - cause = e; - } - if (!success) { - String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ", - txAware.getTransactionAwareName(), currentTx.getTransactionId()); - if (cause == null) { - LOG.warn(message); - } else { - LOG.warn(message, cause); - } - abort(new TransactionFailureException(message, cause)); - // abort will throw that exception - } - } - } - - private void commit() throws TransactionFailureException { - boolean commitSuccess = false; - try { - commitSuccess = txClient.commit(currentTx); - } catch (TransactionNotInProgressException e) { - String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } catch (Throwable e) { - String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId()); - LOG.warn(message, e); - abort(new TransactionFailureException(message, e)); - // abort will throw that exception - } - if (!commitSuccess) { - String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); - abort(new TransactionConflictException(message)); - // abort will throw - } - } - - private void postCommit() throws TransactionFailureException { - TransactionFailureException cause = null; - for (TransactionAware txAware : txAwares) { - try { - txAware.postTxCommit(); - } catch (Throwable e) { - String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ", - txAware.getTransactionAwareName(), currentTx.getTransactionId()); - LOG.warn(message, e); - cause = new TransactionFailureException(message, e); - } - } - if (cause != null) { - throw cause; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java deleted file mode 100644 index a98be14..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionCouldNotTakeSnapshotException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * Throw when taking a snapshot fails. - */ -public class TransactionCouldNotTakeSnapshotException extends Exception { - public TransactionCouldNotTakeSnapshotException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java deleted file mode 100644 index a1cb188..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutor.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -import com.google.common.util.concurrent.ListenableFuture; - -import java.util.concurrent.Callable; - -/** - * Utility that wraps the execution of a function into the context of a transaction. - */ -// todo: implementations should throw different from TransactionFailureException in case of user code error? -// todo: accept only Callable? Executors util has a way to convert everything to Callable... -public interface TransactionExecutor { - - /** - * A function is a class with a single method that takes an argument and returns a result. - * @param <I> the type of the argument - * @param <O> the type of the result - */ - public interface Function<I, O> { - O apply(I input) throws Exception; - } - - /** - * A procedure is a class with a single void method that takes an argument. - * @param <I> the type of the argument - */ - public interface Procedure<I> { - void apply(I input) throws Exception; - } - - /** - * A subroutine is a class with a single void method without arguments. - */ - public interface Subroutine { - void apply() throws Exception; - } - - // Due to a bug in checkstyle, it would emit false positives here of the form - // "Unable to get class information for @throws tag '<exn>' (...)". - // This comment disables that check up to the corresponding ON comments below - - // CHECKSTYLE OFF: @throws - - /** - * Execute a function under transactional semantics. A transaction is started and all datasets - * are initialized with the transaction. Then the passed function is executed, the transaction - * is committed, and the function return value is returned as the return value of this method. - * If any exception is caught, the transaction is aborted and the original exception is rethrown, - * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict, - * a TransactionConflictException is thrown. - * @param function the function to execute - * @param input the input parameter for the function - * @param <I> the input type of the function - * @param <O> the result type of the function - * @return the function's return value - * @throws TransactionConflictException if there is a write conflict with another transaction. - * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets. - */ - <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException; - - // CHECKSTYLE ON - - /** - * Like {@link #execute(Function, Object)} but without a return value. - */ - <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException; - - /** - * Like {@link #execute(Function, Object)} but the callable has no argument. - */ - <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException; - - /** - * Like {@link #execute(Function, Object)} but without argument or return value. - */ - void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException; - - /** - * Same as {@link #execute(Function, Object)} but - * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} - */ - <I, O> O executeUnchecked(Function<I, O> function, I input); - - /** - * Same as {@link #execute(Procedure, Object)} but - * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} - */ - <I> void executeUnchecked(Procedure<I> procedure, I input); - - /** - * Same as {@link #execute(Callable)} but - * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} - */ - <O> O executeUnchecked(Callable<O> callable); - - /** - * Same as {@link #execute(Subroutine)} but - * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} - */ - void executeUnchecked(Subroutine subroutine); - - /** - * Same as {@link #execute(Function, Object)} but executes asynchronously - */ - <I, O> ListenableFuture<O> submit(Function<I, O> function, I input); - - /** - * Same as {@link #execute(Procedure, Object)} but executes asynchronously - */ - <I> ListenableFuture<?> submit(Procedure<I> procedure, I input); - - /** - * Same as {@link #execute(Callable)} but executes asynchronously - */ - <O> ListenableFuture<O> submit(Callable<O> callable); - - /** - * Same as {@link #execute(Subroutine)} but executes asynchronously - */ - ListenableFuture<?> submit(Subroutine subroutine); - -} - http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java deleted file mode 100644 index 4f30478..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/TransactionExecutorFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra; - -/** - * A factory for transaction executors. - */ -public interface TransactionExecutorFactory { - - TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares); - -}
