http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java new file mode 100644 index 0000000..4c33538 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Retry strategy for failed transactions + */ +public interface RetryStrategy { + /** + * Returns the number of milliseconds to wait before retrying the operation. + * + * @param reason Reason for transaction failure. + * @param failureCount Number of times that the request has been failed. + * @return Number of milliseconds to wait before retrying the operation. Returning {@code 0} means + * retry it immediately, while negative means abort the operation. + */ + long nextRetry(TransactionFailureException reason, int failureCount); +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java new file mode 100644 index 0000000..e5902a7 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionAdmin.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.inject.Guice; +import com.google.inject.Injector; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.runtime.ConfigModule; +import org.apache.tephra.runtime.DiscoveryModules; +import org.apache.tephra.runtime.TransactionClientModule; +import org.apache.tephra.runtime.TransactionModules; +import org.apache.tephra.runtime.ZKModule; +import org.apache.twill.zookeeper.ZKClientService; + +import java.io.PrintStream; +import java.util.List; +import java.util.Set; + +/** + * Allows calling some methods on {@link TransactionManager} from command line. + */ +public class TransactionAdmin { + private static final String OPT_TRUNCATE_INVALID_TX = "--truncate-invalid-tx"; + private static final String OPT_TRUNCATE_INVALID_TX_BEFORE = "--truncate-invalid-tx-before"; + private static final String OPT_GET_INVALID_TX_SIZE = "--get-invalid-tx-size"; + + private final PrintStream out; + private final PrintStream err; + + public static void main(String[] args) { + TransactionAdmin txAdmin = new TransactionAdmin(System.out, System.err); + int status = txAdmin.doMain(args, new Configuration()); + System.exit(status); + } + + public TransactionAdmin(PrintStream out, PrintStream err) { + this.out = out; + this.err = err; + } + + @VisibleForTesting + int doMain(String[] args, Configuration conf) { + if (args.length < 1) { + printUsage(); + return 1; + } + + Injector injector = Guice.createInjector( + new ConfigModule(conf), + new ZKModule(), + new DiscoveryModules().getDistributedModules(), + new TransactionModules().getDistributedModules(), + new TransactionClientModule() + ); + + ZKClientService zkClient = injector.getInstance(ZKClientService.class); + zkClient.startAndWait(); + + try { + TransactionSystemClient txClient = injector.getInstance(TransactionSystemClient.class); + String option = args[0]; + + if (option.equals(OPT_TRUNCATE_INVALID_TX)) { + if (args.length != 2) { + printUsage(); + return 1; + } + Set<Long> txIds; + try { + txIds = parseTxIds(args[1]); + } catch (NumberFormatException e) { + err.println("NumberFormatException: " + e.getMessage()); + return 1; + } + if (!txIds.isEmpty()) { + out.println("Invalid list size before truncation: " + txClient.getInvalidSize()); + txClient.truncateInvalidTx(txIds); + out.println("Invalid list size after truncation: " + txClient.getInvalidSize()); + } + } else if (option.equals(OPT_TRUNCATE_INVALID_TX_BEFORE)) { + if (args.length != 2) { + printUsage(); + return 1; + } + try { + long time = Long.parseLong(args[1]); + out.println("Invalid list size before truncation: " + txClient.getInvalidSize()); + txClient.truncateInvalidTxBefore(time); + out.println("Invalid list size after truncation: " + txClient.getInvalidSize()); + } catch (InvalidTruncateTimeException e) { + err.println(e.getMessage()); + return 1; + } catch (NumberFormatException e) { + err.println("NumberFormatException: " + e.getMessage()); + return 1; + } + } else if (option.equals(OPT_GET_INVALID_TX_SIZE)) { + if (args.length != 1) { + printUsage(); + return 1; + } + out.println("Invalid list size: " + txClient.getInvalidSize()); + } else { + printUsage(); + return 1; + } + } finally { + zkClient.stopAndWait(); + } + return 0; + } + + private Set<Long> parseTxIds(String option) throws NumberFormatException { + Set<Long> txIds = Sets.newHashSet(); + for (String str : Splitter.on(',').split(option)) { + txIds.add(Long.parseLong(str)); + } + return txIds; + } + + private void printUsage() { + String programName = TransactionAdmin.class.getSimpleName(); + String spaces = " "; + List<String> options = Lists.newArrayList(); + options.add(join("Usage: ")); + options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX, "<tx1,tx2,...>")); + options.add(join(spaces, programName, OPT_TRUNCATE_INVALID_TX_BEFORE, "<time in secs>")); + options.add(join(spaces, programName, OPT_GET_INVALID_TX_SIZE)); + + String usage = Joiner.on(System.getProperty("line.separator")).join(options); + err.println(usage); + } + + private static String join(String... args) { + return Joiner.on(" ").join(args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java new file mode 100644 index 0000000..c147917 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCodec.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import org.apache.tephra.distributed.TransactionConverterUtils; +import org.apache.tephra.distributed.thrift.TTransaction; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; + +import java.io.IOException; + +/** + * Handles serialization and deserialization of {@link Transaction} instances to and from {@code byte[]}. + */ +public class TransactionCodec { + + public TransactionCodec() { + } + + public byte[] encode(Transaction tx) throws IOException { + TTransaction thriftTx = TransactionConverterUtils.wrap(tx); + TSerializer serializer = new TSerializer(); + try { + return serializer.serialize(thriftTx); + } catch (TException te) { + throw new IOException(te); + } + } + + public Transaction decode(byte[] encoded) throws IOException { + TTransaction thriftTx = new TTransaction(); + TDeserializer deserializer = new TDeserializer(); + try { + deserializer.deserialize(thriftTx, encoded); + return TransactionConverterUtils.unwrap(thriftTx); + } catch (TException te) { + throw new IOException(te); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java new file mode 100644 index 0000000..22a59c6 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import javax.annotation.Nullable; + +/** + * Utility class that encapsulates the transaction life cycle over a given set of + * transaction-aware datasets. It is not thread-safe for concurrent execution. + */ +public class TransactionContext { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); + + private final Collection<TransactionAware> txAwares; + private final TransactionSystemClient txClient; + + private Transaction currentTx; + + public TransactionContext(TransactionSystemClient txClient, TransactionAware... txAwares) { + this(txClient, ImmutableList.copyOf(txAwares)); + } + + public TransactionContext(TransactionSystemClient txClient, Iterable<TransactionAware> txAwares) { + // Use a set to avoid adding the same TransactionAware twice and to make removal faster. + // Use a linked hash set so that insertion order is preserved (same behavior as when it was using a List). + this.txAwares = Sets.newLinkedHashSet(txAwares); + this.txClient = txClient; + } + + /** + * Adds a new transaction-aware to participate in the transaction. + * @param txAware the new transaction-aware + */ + public boolean addTransactionAware(TransactionAware txAware) { + // If the txAware is newly added, call startTx as well if there is an active transaction + boolean added = txAwares.add(txAware); + if (added && currentTx != null) { + txAware.startTx(currentTx); + } + return added; + } + + /** + * Removes a {@link TransactionAware} and withdraws from participation in the transaction. + * Withdrawal is only allowed if there is no active transaction. + * + * @param txAware the {@link TransactionAware} to be removed + * @return true if the given {@link TransactionAware} is removed; false otherwise. + * @throws IllegalStateException if there is an active transaction going on with this TransactionContext. + */ + public boolean removeTransactionAware(TransactionAware txAware) { + Preconditions.checkState(currentTx == null, "Cannot remove TransactionAware while there is an active transaction."); + return txAwares.remove(txAware); + } + + /** + * Starts a new transaction. Calling this will initiate a new transaction using the {@link TransactionSystemClient}, + * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for each registered + * TransactionAware. If an exception is encountered, the transaction will be aborted and a + * {@code TransactionFailureException} wrapping the root cause will be thrown. + * + * @throws TransactionFailureException if an exception occurs starting the transaction with any registered + * TransactionAware + */ + public void start() throws TransactionFailureException { + currentTx = txClient.startShort(); + for (TransactionAware txAware : txAwares) { + try { + txAware.startTx(currentTx); + } catch (Throwable e) { + String message = String.format("Unable to start transaction-aware '%s' for transaction %d. ", + txAware.getTransactionAwareName(), currentTx.getTransactionId()); + LOG.warn(message, e); + txClient.abort(currentTx); + throw new TransactionFailureException(message, e); + } + } + } + + /** + * Commits the current transaction. This will: check for any conflicts, based on the change set aggregated from + * all registered {@link TransactionAware} instances; flush any pending writes from the {@code TransactionAware}s; + * commit the current transaction with the {@link TransactionSystemClient}; and clear the current transaction state. + * + * @throws TransactionConflictException if a conflict is detected with a recently committed transaction + * @throws TransactionFailureException if an error occurs while committing + */ + public void finish() throws TransactionFailureException { + Preconditions.checkState(currentTx != null, "Cannot finish tx that has not been started"); + // each of these steps will abort and rollback the tx in case if errors, and throw an exception + checkForConflicts(); + persist(); + commit(); + postCommit(); + currentTx = null; + } + + /** + * Aborts the given transaction, and rolls back all data set changes. If rollback fails, + * the transaction is invalidated. If an exception is caught during rollback, the exception + * is rethrown wrapped in a TransactionFailureException, after all remaining TransactionAwares have + * completed rollback. + * + * @throws TransactionFailureException for any exception that is encountered. + */ + public void abort() throws TransactionFailureException { + abort(null); + } + + /** + * Checkpoints the current transaction by flushing any pending writes for the registered {@link TransactionAware} + * instances, and obtaining a new current write pointer for the transaction. By performing a checkpoint, + * the client can ensure that all previous writes were flushed and are visible. By default, the current write + * pointer for the transaction is also visible. The current write pointer can be excluded from read + * operations by calling {@link Transaction#setVisibility(Transaction.VisibilityLevel)} with the visibility level set + * to {@link Transaction.VisibilityLevel#SNAPSHOT_EXCLUDE_CURRENT} on the {@link Transaction} instance created + * by the checkpoint call, which can be retrieved by calling {@link #getCurrentTransaction()}. + * + * After the checkpoint operation is performed, the updated + * {@link Transaction} instance will be passed to {@link TransactionAware#startTx(Transaction)} for each + * registered {@code TransactionAware} instance. + * + * @throws TransactionFailureException if an error occurs while performing the checkpoint + */ + public void checkpoint() throws TransactionFailureException { + Preconditions.checkState(currentTx != null, "Cannot checkpoint tx that has not been started"); + persist(); + try { + currentTx = txClient.checkpoint(currentTx); + // update the current transaction with all TransactionAwares + for (TransactionAware txAware : txAwares) { + txAware.updateTx(currentTx); + } + } catch (TransactionNotInProgressException e) { + String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } catch (Throwable e) { + String message = String.format("Exception from checkpoint for transaction %d.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } + } + + /** + * Returns the current transaction or null if no transaction is currently in progress. + */ + @Nullable + public Transaction getCurrentTransaction() { + return currentTx; + } + + // CHECKSTYLE IGNORE "@throws" FOR 11 LINES + /** + * Aborts the given transaction, and rolls back all data set changes. If rollback fails, + * the transaction is invalidated. If an exception is caught during rollback, the exception + * is rethrown wrapped into a TransactionFailureException, after all remaining TransactionAwares have + * completed rollback. If an existing exception is passed in, that exception is thrown in either + * case, whether the rollback is successful or not. In other words, this method always throws the + * first exception that it encounters. + * @param cause the original exception that caused the abort + * @throws TransactionFailureException for any exception that is encountered. + */ + public void abort(TransactionFailureException cause) throws TransactionFailureException { + if (currentTx == null) { + // might be called by some generic exception handler even though already aborted/finished - we allow that + return; + } + try { + boolean success = true; + for (TransactionAware txAware : txAwares) { + try { + if (!txAware.rollbackTx()) { + success = false; + } + } catch (Throwable e) { + String message = String.format("Unable to roll back changes in transaction-aware '%s' for transaction %d. ", + txAware.getTransactionAwareName(), currentTx.getTransactionId()); + LOG.warn(message, e); + if (cause == null) { + cause = new TransactionFailureException(message, e); + } + success = false; + } + } + if (success) { + txClient.abort(currentTx); + } else { + txClient.invalidate(currentTx.getTransactionId()); + } + if (cause != null) { + throw cause; + } + } finally { + currentTx = null; + } + } + + private void checkForConflicts() throws TransactionFailureException { + Collection<byte[]> changes = Lists.newArrayList(); + for (TransactionAware txAware : txAwares) { + try { + changes.addAll(txAware.getTxChanges()); + } catch (Throwable e) { + String message = String.format("Unable to retrieve changes from transaction-aware '%s' for transaction %d. ", + txAware.getTransactionAwareName(), currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } + } + + boolean canCommit = false; + try { + canCommit = txClient.canCommit(currentTx, changes); + } catch (TransactionNotInProgressException e) { + String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } catch (Throwable e) { + String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } + if (!canCommit) { + String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); + abort(new TransactionConflictException(message)); + // abort will throw + } + } + + private void persist() throws TransactionFailureException { + for (TransactionAware txAware : txAwares) { + boolean success; + Throwable cause = null; + try { + success = txAware.commitTx(); + } catch (Throwable e) { + success = false; + cause = e; + } + if (!success) { + String message = String.format("Unable to persist changes of transaction-aware '%s' for transaction %d. ", + txAware.getTransactionAwareName(), currentTx.getTransactionId()); + if (cause == null) { + LOG.warn(message); + } else { + LOG.warn(message, cause); + } + abort(new TransactionFailureException(message, cause)); + // abort will throw that exception + } + } + } + + private void commit() throws TransactionFailureException { + boolean commitSuccess = false; + try { + commitSuccess = txClient.commit(currentTx); + } catch (TransactionNotInProgressException e) { + String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } catch (Throwable e) { + String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId()); + LOG.warn(message, e); + abort(new TransactionFailureException(message, e)); + // abort will throw that exception + } + if (!commitSuccess) { + String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId()); + abort(new TransactionConflictException(message)); + // abort will throw + } + } + + private void postCommit() throws TransactionFailureException { + TransactionFailureException cause = null; + for (TransactionAware txAware : txAwares) { + try { + txAware.postTxCommit(); + } catch (Throwable e) { + String message = String.format("Unable to perform post-commit in transaction-aware '%s' for transaction %d. ", + txAware.getTransactionAwareName(), currentTx.getTransactionId()); + LOG.warn(message, e); + cause = new TransactionFailureException(message, e); + } + } + if (cause != null) { + throw cause; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java new file mode 100644 index 0000000..3af9596 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionCouldNotTakeSnapshotException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * Throw when taking a snapshot fails. + */ +public class TransactionCouldNotTakeSnapshotException extends Exception { + public TransactionCouldNotTakeSnapshotException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java new file mode 100644 index 0000000..ed5029e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutor.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.concurrent.Callable; + +/** + * Utility that wraps the execution of a function into the context of a transaction. + */ +// todo: implementations should throw different from TransactionFailureException in case of user code error? +// todo: accept only Callable? Executors util has a way to convert everything to Callable... +public interface TransactionExecutor { + + /** + * A function is a class with a single method that takes an argument and returns a result. + * @param <I> the type of the argument + * @param <O> the type of the result + */ + public interface Function<I, O> { + O apply(I input) throws Exception; + } + + /** + * A procedure is a class with a single void method that takes an argument. + * @param <I> the type of the argument + */ + public interface Procedure<I> { + void apply(I input) throws Exception; + } + + /** + * A subroutine is a class with a single void method without arguments. + */ + public interface Subroutine { + void apply() throws Exception; + } + + // Due to a bug in checkstyle, it would emit false positives here of the form + // "Unable to get class information for @throws tag '<exn>' (...)". + // This comment disables that check up to the corresponding ON comments below + + // CHECKSTYLE OFF: @throws + + /** + * Execute a function under transactional semantics. A transaction is started and all datasets + * are initialized with the transaction. Then the passed function is executed, the transaction + * is committed, and the function return value is returned as the return value of this method. + * If any exception is caught, the transaction is aborted and the original exception is rethrown, + * wrapped into a TransactionFailureException. If the transaction fails due to a write conflict, + * a TransactionConflictException is thrown. + * @param function the function to execute + * @param input the input parameter for the function + * @param <I> the input type of the function + * @param <O> the result type of the function + * @return the function's return value + * @throws TransactionConflictException if there is a write conflict with another transaction. + * @throws TransactionFailureException if any exception is caught, be it from the function or from the datasets. + */ + <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException; + + // CHECKSTYLE ON + + /** + * Like {@link #execute(Function, Object)} but without a return value. + */ + <I> void execute(Procedure<I> procedure, I input) throws TransactionFailureException, InterruptedException; + + /** + * Like {@link #execute(Function, Object)} but the callable has no argument. + */ + <O> O execute(Callable<O> callable) throws TransactionFailureException, InterruptedException; + + /** + * Like {@link #execute(Function, Object)} but without argument or return value. + */ + void execute(Subroutine subroutine) throws TransactionFailureException, InterruptedException; + + /** + * Same as {@link #execute(Function, Object)} but + * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} + */ + <I, O> O executeUnchecked(Function<I, O> function, I input); + + /** + * Same as {@link #execute(Procedure, Object)} but + * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} + */ + <I> void executeUnchecked(Procedure<I> procedure, I input); + + /** + * Same as {@link #execute(Callable)} but + * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} + */ + <O> O executeUnchecked(Callable<O> callable); + + /** + * Same as {@link #execute(Subroutine)} but + * suppresses exception with {@link com.google.common.base.Throwables#propagate(Throwable)} + */ + void executeUnchecked(Subroutine subroutine); + + /** + * Same as {@link #execute(Function, Object)} but executes asynchronously + */ + <I, O> ListenableFuture<O> submit(Function<I, O> function, I input); + + /** + * Same as {@link #execute(Procedure, Object)} but executes asynchronously + */ + <I> ListenableFuture<?> submit(Procedure<I> procedure, I input); + + /** + * Same as {@link #execute(Callable)} but executes asynchronously + */ + <O> ListenableFuture<O> submit(Callable<O> callable); + + /** + * Same as {@link #execute(Subroutine)} but executes asynchronously + */ + ListenableFuture<?> submit(Subroutine subroutine); + +} + http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java new file mode 100644 index 0000000..afe0f33 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/TransactionExecutorFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra; + +/** + * A factory for transaction executors. + */ +public interface TransactionExecutorFactory { + + TransactionExecutor createExecutor(Iterable<TransactionAware> txAwares); + +}
