http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java new file mode 100644 index 0000000..8767433 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/testing/easymock/EasyMockTest.java @@ -0,0 +1,124 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing.easymock; + +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.WildcardType; + +import com.google.common.base.Preconditions; +import com.google.common.reflect.TypeToken; +import com.google.common.testing.TearDown; +import com.google.common.testing.junit4.TearDownTestCase; + +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IMocksControl; +import org.junit.Before; + +import static org.easymock.EasyMock.createControl; + +/** + * A baseclass for tests that use EasyMock. A new {@link IMocksControl control} is set up before + * each test and the mocks created and replayed with it are verified during tear down. + * + * @author John Sirois + */ +public abstract class EasyMockTest extends TearDownTestCase { + protected IMocksControl control; + + /** + * Creates an EasyMock {@link #control} for tests to use that will be automatically + * {@link IMocksControl#verify() verified} on tear down. + */ + @Before + public final void setupEasyMock() { + control = createControl(); + addTearDown(new TearDown() { + @Override public void tearDown() { + control.verify(); + } + }); + } + + /** + * Creates an EasyMock mock with this test's control. Will be + * {@link IMocksControl#verify() verified} in a tear down. + */ + public <T> T createMock(Class<T> type) { + Preconditions.checkNotNull(type); + return control.createMock(type); + } + + /** + * A class meant to be sub-classed in order to capture a generic type literal value. To capture + * the type of a {@code List<String>} you would use: {@code new Clazz<List<String>>() {}} + */ + public abstract static class Clazz<T> extends TypeToken { + Class<T> rawType() { + @SuppressWarnings("unchecked") + Class<T> rawType = (Class<T>) findRawType(); + return rawType; + } + + private Class<?> findRawType() { + if (getType() instanceof Class<?>) { // Plain old + return (Class<?>) getType(); + + } else if (getType() instanceof ParameterizedType) { // Nested type parameter + ParameterizedType parametrizedType = (ParameterizedType) getType(); + Type rawType = parametrizedType.getRawType(); + return (Class<?>) rawType; + } else if (getType() instanceof GenericArrayType) { + throw new IllegalStateException("cannot mock arrays, rejecting type: " + getType()); + } else if (getType() instanceof WildcardType) { + throw new IllegalStateException( + "wildcarded instantiations are not allowed in java, rejecting type: " + getType()); + } else { + throw new IllegalArgumentException("Could not decode raw type for: " + getType()); + } + } + + public T createMock() { + return EasyMock.createMock(rawType()); + } + + public T createMock(IMocksControl control) { + return control.createMock(rawType()); + } + } + + /** + * Creates an EasyMock mock with this test's control. Will be + * {@link IMocksControl#verify() verified} in a tear down. + * + * Allows for mocking of parameterized types without all the unchecked conversion warnings in a + * safe way. + */ + public <T> T createMock(Clazz<T> type) { + Preconditions.checkNotNull(type); + return type.createMock(control); + } + + /** + * A type-inferring convenience method for creating new captures. + */ + public static <T> Capture<T> createCapture() { + return new Capture<T>(); + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java new file mode 100644 index 0000000..5197e91 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/testing/easymock/IterableEquals.java @@ -0,0 +1,77 @@ +package com.twitter.common.testing.easymock; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multiset; + +import org.easymock.IArgumentMatcher; + +import static org.easymock.EasyMock.reportMatcher; + +/** + * This EasyMock argument matcher tests Iterables for equality irrespective of order. + * + * @param <T> type argument for the Iterables being matched. + */ +public class IterableEquals<T> implements IArgumentMatcher { + private final Multiset<T> elements = HashMultiset.create(); + + /** + * Constructs an IterableEquals object that tests for equality against the specified expected + * Iterable. + * + * @param expected an Iterable containing the elements that are expected, in any order. + */ + public IterableEquals(Iterable<T> expected) { + Iterables.addAll(elements, expected); + } + + @Override + public boolean matches(Object observed) { + if (observed instanceof Iterable<?>) { + Multiset<Object> observedElements = HashMultiset.create((Iterable<?>) observed); + return elements.equals(observedElements); + } + return false; + } + + @Override + public void appendTo(StringBuffer buffer) { + buffer.append("eqIterable(").append(elements).append(")"); + } + + /** + * When used in EasyMock expectations, this matches an Iterable having the same elements in any + * order. + * + * @return null, to avoid a compile time error. + */ + public static <T> Iterable<T> eqIterable(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } + + /** + * When used in EasyMock expectations, this matches a List having the same elements in any order. + * + * @return null, to avoid a compile time error. + */ + public static <T> List<T> eqList(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } + + /** + * When used in EasyMock expectations, this matches a Collection having the same elements in any + * order. + * + * @return null, to avoid a compile time error. + */ + public static <T> Collection<T> eqCollection(Iterable<T> in) { + reportMatcher(new IterableEquals(in)); + return null; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java new file mode 100644 index 0000000..5658ff6 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/testing/junit/rules/Retry.java @@ -0,0 +1,161 @@ +// ================================================================================================= +// Copyright 2015 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing.junit.rules; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + +import org.junit.rules.MethodRule; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.Statement; + +/** + * A test method annotation useful for smoking out flaky behavior in tests. + * + * @see Retry.Rule RetryRule needed to enable this annotation in a test class. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Retry { + + /** + * The number of times to retry the test. + * + * When a {@link Retry.Rule} is installed and a test method is annotated for {@literal @Retry}, + * it will be retried 0 to N times. If times is negative, it is treated as 0 and no retries are + * performed. If times is >= 1 then a successful execution of the annotated test method is + * retried until the 1st error, failure or otherwise up to {@code times} times. + */ + int times() default 1; + + /** + * Enables {@link Retry @Retry}able tests. + */ + class Rule implements MethodRule { + private interface ThrowableFactory { + Throwable create(String message, Throwable cause); + } + + private static Throwable annotate( + int tryNumber, + final int maxRetries, + Throwable cause, + String prefix, + ThrowableFactory throwableFactory) { + + Throwable annotated = + throwableFactory.create( + String.format("%s on try %d of %d: %s", prefix, tryNumber, maxRetries + 1, + Objects.firstNonNull(cause.getMessage(), "")), cause); + annotated.setStackTrace(cause.getStackTrace()); + return annotated; + } + + static class RetriedAssertionError extends AssertionError { + private final int tryNumber; + private final int maxRetries; + + RetriedAssertionError(int tryNumber, int maxRetries, String message, Throwable cause) { + // We do a manual initCause here to be compatible with the Java 1.6 AssertionError + // constructors. + super(message); + initCause(cause); + + this.tryNumber = tryNumber; + this.maxRetries = maxRetries; + } + + @VisibleForTesting + int getTryNumber() { + return tryNumber; + } + + @VisibleForTesting + int getMaxRetries() { + return maxRetries; + } + } + + private static Throwable annotate(final int tryNumber, final int maxRetries, AssertionError e) { + return annotate(tryNumber, maxRetries, e, "Failure", new ThrowableFactory() { + @Override public Throwable create(String message, Throwable cause) { + return new RetriedAssertionError(tryNumber, maxRetries, message, cause); + } + }); + } + + static class RetriedException extends Exception { + private final int tryNumber; + private final int maxRetries; + + RetriedException(int tryNumber, int maxRetries, String message, Throwable cause) { + super(message, cause); + this.tryNumber = tryNumber; + this.maxRetries = maxRetries; + } + + @VisibleForTesting + int getTryNumber() { + return tryNumber; + } + + @VisibleForTesting + int getMaxRetries() { + return maxRetries; + } + } + + private static Throwable annotate(final int tryNumber, final int maxRetries, Exception e) { + return annotate(tryNumber, maxRetries, e, "Error", new ThrowableFactory() { + @Override public Throwable create(String message, Throwable cause) { + return new RetriedException(tryNumber, maxRetries, message, cause); + } + }); + } + + @Override + public Statement apply(final Statement statement, FrameworkMethod method, Object receiver) { + Retry retry = method.getAnnotation(Retry.class); + if (retry == null || retry.times() <= 0) { + return statement; + } else { + final int times = retry.times(); + return new Statement() { + @Override public void evaluate() throws Throwable { + for (int i = 0; i <= times; i++) { + try { + statement.evaluate(); + } catch (AssertionError e) { + throw annotate(i + 1, times, e); + // We purposefully catch any non-assertion exceptions in order to tag the try count + // for erroring (as opposed to failing) tests. + // SUPPRESS CHECKSTYLE RegexpSinglelineJava + } catch (Exception e) { + throw annotate(i + 1, times, e); + } + } + } + }; + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java new file mode 100644 index 0000000..a56bb2b --- /dev/null +++ b/commons/src/main/java/com/twitter/common/testing/mockito/MockitoTest.java @@ -0,0 +1,34 @@ +// ================================================================================================= +// Copyright 2012 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.testing.mockito; + +import org.junit.Before; +import org.mockito.MockitoAnnotations; + +/** + * A base class for tests that use Mockito. Before each test, it initializes all the mocks + * declared in the class. + */ +public abstract class MockitoTest { + /** + * Initializes all fields annotated with {@link org.mockito.Mock}. + */ + @Before + public final void initMockito() { + MockitoAnnotations.initMocks(this); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Config.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/Config.java b/commons/src/main/java/com/twitter/common/thrift/Config.java new file mode 100644 index 0000000..977489f --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/Config.java @@ -0,0 +1,305 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; +import com.twitter.common.stats.StatsProvider; + +/** + * Represents the configuration for a thrift call. Use {@link #builder()} to create a new one or + * or {@link #builder(Config)} to create a new config based on another config. + * + * <p>If a deadline is specified, it acts as a global timeout for each thrift call made. + * Obtaining connections, performing the remote call and executing retries are all expected to + * complete within this deadline. When the specified deadline is not met, an + * {@link TTimeoutException} will be thrown. + * + * <p>If max retries is specified as zero (never retry), then the list of retryable exceptions are + * ignored. It is only when max retries is greater than zero that list of retryable exceptions is + * used to determine if a particular failed call should be retried. + * + * @author John Sirois + */ +public class Config { + + /** + * Created a builder for a new {@link Config}. Default values are as follows: + * <ul> + * <li>{@link #getRequestTimeout()} 0 + * <li>{@link #getMaxRetries()} 0 + * <li>{@link #getRetryableExceptions()} [] + * <li>{@link #isDebug()} ()} false + * </ul> + */ + public static Builder builder() { + return new Builder(); + } + + /** + * + * @param config the builder configuration to use + */ + public static Builder builder(Config config) { + Preconditions.checkNotNull(config); + return new Builder(config); + } + + private static final Amount<Long,Time> DEADLINE_BLOCKING = Amount.of(0L, Time.MILLISECONDS); + + @VisibleForTesting + static final Amount<Long,Time> DEFAULT_CONNECT_TIMEOUT = Amount.of(5L, Time.SECONDS); + + private Amount<Long, Time> requestTimeout = DEADLINE_BLOCKING; + private Amount<Long, Time> connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int maxRetries; + private ImmutableSet<Class<? extends Exception>> retryableExceptions = ImmutableSet.of(); + private boolean debug = false; + private boolean enableStats = true; + private StatsProvider statsProvider = Stats.STATS_PROVIDER; + + private Config() { + // defaults + } + + private Config(Config copyFrom) { + requestTimeout = copyFrom.requestTimeout; + maxRetries = copyFrom.maxRetries; + retryableExceptions = copyFrom.retryableExceptions; + debug = copyFrom.debug; + statsProvider = copyFrom.statsProvider; + } + + /** + * Returns the maximum time to wait for any thrift call to complete. A deadline of 0 means to + * wait forever + */ + public Amount<Long, Time> getRequestTimeout() { + return requestTimeout; + } + + /** + * Returns the maximum time to wait for a connection to be established. A deadline of 0 means to + * wait forever + */ + public Amount<Long, Time> getConnectTimeout() { + return connectTimeout; + } + + /** + * Returns the maximum number of retries to perform for each thrift call. A value of 0 means to + * never retry and in this case {@link #getRetryableExceptions()} will be an empty set. + */ + public int getMaxRetries() { + return maxRetries; + } + + /** + * Returns the set of exceptions to retry calls for. The returned set will only be empty if + * {@link #getMaxRetries()} is 0. + */ + public ImmutableSet<Class<? extends Exception>> getRetryableExceptions() { + return retryableExceptions; + } + + /** + * Returns {@code true} if the client should log extra debugging information. Currently this + * includes method call arguments when RPCs fail with exceptions. + */ + public boolean isDebug() { + return debug; + } + + /** + * Returns {@code true} if the client should track request statistics. + */ + public boolean enableStats() { + return enableStats; + } + + /** + * Returns the stats provider to use to record Thrift client stats. + */ + public StatsProvider getStatsProvider() { + return statsProvider; + } + + // This was made public because it seems to be causing problems for scala users when it is not + // public. + public static abstract class AbstractBuilder<T extends AbstractBuilder> { + private final Config config; + + AbstractBuilder() { + this.config = new Config(); + } + + AbstractBuilder(Config template) { + Preconditions.checkNotNull(template); + this.config = new Config(template); + } + + protected abstract T getThis(); + + // TODO(John Sirois): extra validation or design ... can currently do strange things like: + // builder.blocking().withDeadline(1, TimeUnit.MILLISECONDS) + // builder.noRetries().retryOn(TException.class) + + /** + * Specifies that all calls be blocking calls with no inherent deadline. It may be the + * case that underlying transports will eventually deadline, but {@link Thrift} will not + * enforce a deadline. + */ + public final T blocking() { + config.requestTimeout = DEADLINE_BLOCKING; + return getThis(); + } + + /** + * Specifies that all calls be subject to a global timeout. This deadline includes all call + * activities, including obtaining a free connection and any automatic retries. + */ + public final T withRequestTimeout(Amount<Long, Time> timeout) { + Preconditions.checkNotNull(timeout); + Preconditions.checkArgument(timeout.getValue() >= 0, + "A negative deadline is invalid: %s", timeout); + config.requestTimeout = timeout; + return getThis(); + } + + /** + * Assigns the timeout for all connections established with the blocking client. + * On an asynchronous client this timeout is only used for the connection pool lock + * acquisition on initial calls (not retries, @see withRetries). The actual network + * connection timeout for the asynchronous client is governed by socketTimeout. + * + * @param timeout Connection timeout. + * @return A reference to the builder. + */ + public final T withConnectTimeout(Amount<Long, Time> timeout) { + Preconditions.checkNotNull(timeout); + Preconditions.checkArgument(timeout.getValue() >= 0, + "A negative deadline is invalid: %s", timeout); + config.connectTimeout = timeout; + return getThis(); + } + + /** + * Specifies that no calls be automatically retried. + */ + public final T noRetries() { + config.maxRetries = 0; + config.retryableExceptions = ImmutableSet.of(); + return getThis(); + } + + /** + * Specifies that failing calls meeting {@link #retryOn retry} criteria be retried up to a + * maximum of {@code retries} times before failing. On an asynchronous client, these retries + * will be forced to be non-blocking, failing fast if they cannot immediately acquire the + * connection pool locks, so they only provide a best-effort retry strategy there. + */ + public final T withRetries(int retries) { + Preconditions.checkArgument(retries >= 0, "A negative retry count is invalid: %d", retries); + config.maxRetries = retries; + return getThis(); + } + + /** + * Specifies the set of exception classes that are to be considered retryable (if retries are + * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable + * if they are an instance of any one of the specified exception classes. The set of exception + * classes must contain at least exception class. To specify no retries either use + * {@link #noRetries()} or pass zero to {@link #withRetries(int)}. + */ + public final T retryOn(Iterable<? extends Class<? extends Exception>> retryableExceptions) { + Preconditions.checkNotNull(retryableExceptions); + ImmutableSet<Class<? extends Exception>> classes = + ImmutableSet.copyOf(Iterables.filter(retryableExceptions, Predicates.notNull())); + Preconditions.checkArgument(!classes.isEmpty(), + "Must provide at least one retryable exception class"); + config.retryableExceptions = classes; + return getThis(); + } + + /** + * Specifies the set of exception classes that are to be considered retryable (if retries are + * enabled). Any exceptions thrown by the underlying thrift call will be considered retryable + * if they are an instance of any one of the specified exception classes. The set of exception + * classes must contain at least exception class. To specify no retries either use + * {@link #noRetries()} or pass zero to {@link #withRetries(int)}. + */ + public final T retryOn(Class<? extends Exception> exception) { + Preconditions.checkNotNull(exception); + config.retryableExceptions = + ImmutableSet.<Class<? extends Exception>>builder().add(exception).build(); + return getThis(); + } + + /** + * When {@code debug == true}, specifies that extra debugging information should be logged. + */ + public final T withDebug(boolean debug) { + config.debug = debug; + return getThis(); + } + + /** + * Disables stats collection on the client (enabled by default). + */ + public T disableStats() { + config.enableStats = false; + return getThis(); + } + + /** + * Registers a custom stats provider to use to track various client stats. + */ + public T withStatsProvider(StatsProvider statsProvider) { + config.statsProvider = Preconditions.checkNotNull(statsProvider); + return getThis(); + } + + protected final Config getConfig() { + return config; + } + } + + public static final class Builder extends AbstractBuilder<Builder> { + private Builder() { + super(); + } + + private Builder(Config template) { + super(template); + } + + @Override + protected Builder getThis() { + return this; + } + + public Config create() { + return getConfig(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java new file mode 100644 index 0000000..fb9194d --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/TResourceExhaustedException.java @@ -0,0 +1,42 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import org.apache.thrift.TException; + +/** + * @author Adam Samet + * + * This is exception is thrown when there are no available instances of a thrift backend + * service to serve requests. + */ +public class TResourceExhaustedException extends TException { + + private static final long serialVersionUID = 1L; + + public TResourceExhaustedException(String message) { + super(message); + } + + public TResourceExhaustedException(Throwable cause) { + super(cause); + } + + public TResourceExhaustedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java new file mode 100644 index 0000000..50020bd --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/TTimeoutException.java @@ -0,0 +1,41 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import org.apache.thrift.TException; + +/** + * @author Adam Samet + * + * This is exception is thrown when accessing a thrift service resource times out. + */ +public class TTimeoutException extends TException { + + private static final long serialVersionUID = 1L; + + public TTimeoutException(String message) { + super(message); + } + + public TTimeoutException(Throwable cause) { + super(cause); + } + + public TTimeoutException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java new file mode 100644 index 0000000..329c03f --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/TTransportConnection.java @@ -0,0 +1,73 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import com.google.common.base.Preconditions; +import com.twitter.common.net.pool.Connection; +import com.twitter.common.net.pool.ConnectionPool; +import org.apache.thrift.transport.TTransport; + +import java.net.InetSocketAddress; + +/** + * A {@link ConnectionPool} compatible thrift connection that can work with any valid thrift + * transport. + * + * @author John Sirois + */ +public class TTransportConnection implements Connection<TTransport, InetSocketAddress> { + + private final TTransport transport; + private final InetSocketAddress endpoint; + + public TTransportConnection(TTransport transport, InetSocketAddress endpoint) { + this.transport = Preconditions.checkNotNull(transport); + this.endpoint = Preconditions.checkNotNull(endpoint); + } + + /** + * Returns {@code true} if the underlying transport is still open. To invalidate a transport it + * should be closed. + * + * <p>TODO(John Sirois): it seems like an improper soc to have validity testing here and not also an + * invalidation method - correct or accept + */ + @Override + public boolean isValid() { + return transport.isOpen(); + } + + @Override + public TTransport get() { + return transport; + } + + @Override + public void close() { + transport.close(); + } + + @Override + public InetSocketAddress getEndpoint() { + return endpoint; + } + + @Override + public String toString() { + return endpoint.toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/Thrift.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/Thrift.java b/commons/src/main/java/com/twitter/common/thrift/Thrift.java new file mode 100644 index 0000000..aecf251 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/Thrift.java @@ -0,0 +1,393 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import java.io.IOException; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.net.loadbalancing.RequestTracker; +import com.twitter.common.net.pool.Connection; +import com.twitter.common.net.pool.ObjectPool; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.thrift.callers.Caller; +import com.twitter.common.thrift.callers.DeadlineCaller; +import com.twitter.common.thrift.callers.DebugCaller; +import com.twitter.common.thrift.callers.RetryingCaller; +import com.twitter.common.thrift.callers.StatTrackingCaller; +import com.twitter.common.thrift.callers.ThriftCaller; + +/** + * A generic thrift client that handles reconnection in the case of protocol errors, automatic + * retries, call deadlines and call statistics tracking. This class aims for behavior compatible + * with the <a href="http://github.com/fauna/thrift_client">generic ruby thrift client</a>. + * + * <p>In order to enforce call deadlines for synchronous clients, this class uses an + * {@link java.util.concurrent.ExecutorService}. If a custom executor is supplied, it should throw + * a subclass of {@link RejectedExecutionException} to signal thread resource exhaustion, in which + * case the client will fail fast and propagate the event as a {@link TResourceExhaustedException}. + * + * TODO(William Farner): Before open sourcing, look into changing the current model of wrapped proxies + * to use a single proxy and wrapped functions for decorators. + * + * @author John Sirois + */ +public class Thrift<T> { + + /** + * The default thrift call configuration used if none is specified. + * + * Specifies the following settings: + * <ul> + * <li>global call timeout: 1 second + * <li>call retries: 0 + * <li>retryable exceptions: TTransportException (network exceptions including socket timeouts) + * <li>wait for connections: true + * <li>debug: false + * </ul> + */ + public static final Config DEFAULT_CONFIG = Config.builder() + .withRequestTimeout(Amount.of(1L, Time.SECONDS)) + .noRetries() + .retryOn(TTransportException.class) // if maxRetries is set non-zero + .create(); + + /** + * The default thrift call configuration used for an async client if none is specified. + * + * Specifies the following settings: + * <ul> + * <li>global call timeout: none + * <li>call retries: 0 + * <li>retryable exceptions: IOException, TTransportException + * (network exceptions but not timeouts) + * <li>wait for connections: true + * <li>debug: false + * </ul> + */ + @SuppressWarnings("unchecked") + public static final Config DEFAULT_ASYNC_CONFIG = Config.builder(DEFAULT_CONFIG) + .withRequestTimeout(Amount.of(0L, Time.SECONDS)) + .noRetries() + .retryOn(ImmutableSet.<Class<? extends Exception>>builder() + .add(IOException.class) + .add(TTransportException.class).build()) // if maxRetries is set non-zero + .create(); + + private final Config defaultConfig; + private final ExecutorService executorService; + private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; + private final RequestTracker<InetSocketAddress> requestTracker; + private final String serviceName; + private final Class<T> serviceInterface; + private final Function<TTransport, T> clientFactory; + private final boolean async; + private final boolean withSsl; + + /** + * Constructs an instance with the {@link #DEFAULT_CONFIG}, cached thread pool + * {@link ExecutorService}, and synchronous calls. + * + * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, + * boolean, boolean) + */ + public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, + String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory) { + + this(DEFAULT_CONFIG, connectionPool, requestTracker, serviceName, serviceInterface, + clientFactory, false, false); + } + + /** + * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool + * {@link ExecutorService}. + * + * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, + * boolean, boolean) + */ + public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, + String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, + boolean async) { + + this(getConfig(async), connectionPool, requestTracker, serviceName, + serviceInterface, clientFactory, async, false); + } + + /** + * Constructs an instance with the {@link #DEFAULT_CONFIG} and cached thread pool + * {@link ExecutorService}. + * + * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, + * boolean, boolean) + */ + public Thrift(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, + String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, + boolean async, boolean ssl) { + + this(getConfig(async), connectionPool, requestTracker, serviceName, + serviceInterface, clientFactory, async, ssl); + } + + /** + * Constructs an instance with a cached thread pool {@link ExecutorService}. + * + * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, + * boolean, boolean) + */ + public Thrift(Config config, ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, + String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, + boolean async, boolean ssl) { + + this(config, + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Thrift["+ serviceName +"][%d]") + .build()), + connectionPool, requestTracker, serviceName, serviceInterface, clientFactory, async, ssl); + } + + /** + * Constructs an instance with the {@link #DEFAULT_CONFIG}. + * + * @see #Thrift(Config, ExecutorService, ObjectPool, RequestTracker , String, Class, Function, + * boolean, boolean) + */ + public Thrift(ExecutorService executorService, + ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, + String serviceName, Class<T> serviceInterface, Function<TTransport, T> clientFactory, + boolean async, boolean ssl) { + + this(getConfig(async), executorService, connectionPool, requestTracker, serviceName, + serviceInterface, clientFactory, async, ssl); + } + + private static Config getConfig(boolean async) { + return async ? DEFAULT_ASYNC_CONFIG : DEFAULT_CONFIG; + } + + /** + * Constructs a new Thrift factory for creating clients that make calls to a particular thrift + * service. + * + * <p>Note that the combination of {@code config} and {@code connectionPool} need to be chosen + * with care depending on usage of the generated thrift clients. In particular, if configured + * to not wait for connections, the {@code connectionPool} ought to be warmed up with a set of + * connections or else be actively building connections in the background. + * + * <p>TODO(John Sirois): consider adding an method to ObjectPool that would allow Thrift to handle + * this case by pro-actively warming the pool. + * + * @param config the default configuration to use for all thrift calls; also the configuration all + * {@link ClientBuilder}s start with + * @param executorService for invoking calls with a specified deadline + * @param connectionPool the source for thrift connections + * @param serviceName a /vars friendly name identifying the service clients will connect to + * @param serviceInterface the thrift compiler generate interface class for the remote service + * (Iface) + * @param clientFactory a function that can generate a concrete thrift client for the given + * {@code serviceInterface} + * @param async enable asynchronous API + * @param ssl enable TLS handshaking for Thrift calls + */ + public Thrift(Config config, ExecutorService executorService, + ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, + RequestTracker<InetSocketAddress> requestTracker, String serviceName, + Class<T> serviceInterface, Function<TTransport, T> clientFactory, boolean async, boolean ssl) { + + defaultConfig = Preconditions.checkNotNull(config); + this.executorService = Preconditions.checkNotNull(executorService); + this.connectionPool = Preconditions.checkNotNull(connectionPool); + this.requestTracker = Preconditions.checkNotNull(requestTracker); + this.serviceName = MorePreconditions.checkNotBlank(serviceName); + this.serviceInterface = checkServiceInterface(serviceInterface); + this.clientFactory = Preconditions.checkNotNull(clientFactory); + this.async = async; + this.withSsl = ssl; + } + + static <I> Class<I> checkServiceInterface(Class<I> serviceInterface) { + Preconditions.checkNotNull(serviceInterface); + Preconditions.checkArgument(serviceInterface.isInterface(), + "%s must be a thrift service interface", serviceInterface); + return serviceInterface; + } + + /** + * Closes any open connections and prepares this thrift client for graceful shutdown. Any thrift + * client proxies returned from {@link #create()} will become invalid. + */ + public void close() { + connectionPool.close(); + executorService.shutdown(); + } + + /** + * A builder class that allows modifications of call behavior to be made for a given Thrift + * client. Note that in the case of conflicting configuration calls, the last call wins. So, + * for example, the following sequence would result in all calls being subject to a 5 second + * global deadline: + * <code> + * builder.blocking().withDeadline(5, TimeUnit.SECONDS).create() + * </code> + * + * @see Config + */ + public final class ClientBuilder extends Config.AbstractBuilder<ClientBuilder> { + private ClientBuilder(Config template) { + super(template); + } + + @Override + protected ClientBuilder getThis() { + return this; + } + + /** + * Creates a new client using the built up configuration changes. + */ + public T create() { + return createClient(getConfig()); + } + } + + /** + * Creates a new thrift client builder that inherits this Thrift instance's default configuration. + * This is useful for customizing a client for a particular thrift call that makes sense to treat + * differently from the rest of the calls to a given service. + */ + public ClientBuilder builder() { + return builder(defaultConfig); + } + + /** + * Creates a new thrift client builder that inherits the given configuration. + * This is useful for customizing a client for a particular thrift call that makes sense to treat + * differently from the rest of the calls to a given service. + */ + public ClientBuilder builder(Config config) { + Preconditions.checkNotNull(config); + return new ClientBuilder(config); + } + + /** + * Creates a new client using the default configuration specified for this Thrift instance. + */ + public T create() { + return createClient(defaultConfig); + } + + private T createClient(Config config) { + StatsProvider statsProvider = config.getStatsProvider(); + + // lease/call/[invalidate]/release + boolean debug = config.isDebug(); + + Caller decorated = new ThriftCaller<T>(connectionPool, requestTracker, clientFactory, + config.getConnectTimeout(), debug); + + // [retry] + if (config.getMaxRetries() > 0) { + decorated = new RetryingCaller(decorated, async, statsProvider, serviceName, + config.getMaxRetries(), config.getRetryableExceptions(), debug); + } + + // [deadline] + if (config.getRequestTimeout().getValue() > 0) { + Preconditions.checkArgument(!async, + "Request deadlines may not be used with an asynchronous client."); + + decorated = new DeadlineCaller(decorated, async, executorService, config.getRequestTimeout()); + } + + // [debug] + if (debug) { + decorated = new DebugCaller(decorated, async); + } + + // stats + if (config.enableStats()) { + decorated = new StatTrackingCaller(decorated, async, statsProvider, serviceName); + } + + final Caller caller = decorated; + + final InvocationHandler invocationHandler = new InvocationHandler() { + @Override + public Object invoke(Object o, Method method, Object[] args) throws Throwable { + AsyncMethodCallback callback = null; + if (args != null && async) { + List<Object> argsList = Lists.newArrayList(args); + callback = extractCallback(argsList); + args = argsList.toArray(); + } + + return caller.call(method, args, callback, null); + } + }; + + @SuppressWarnings("unchecked") + T instance = (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), + new Class<?>[] {serviceInterface}, invocationHandler); + return instance; + } + + /** + * Verifies that the final argument in a list of objects is a fully-formed + * {@link AsyncMethodCallback} and extracts it, removing it from the argument list. + * + * @param args Argument list to remove the callback from. + * @return The callback extracted from {@code args}. + */ + private static AsyncMethodCallback extractCallback(List<Object> args) { + // TODO(William Farner): Check all interface methods when building the Thrift client + // and verify that last arguments are all callbacks...this saves us from checking + // each time. + + // Check that the last argument is a callback. + Preconditions.checkArgument(args.size() > 0); + Object lastArg = args.get(args.size() - 1); + Preconditions.checkArgument(lastArg instanceof AsyncMethodCallback, + "Last argument of an async thrift call is expected to be of type AsyncMethodCallback."); + + return (AsyncMethodCallback) args.remove(args.size() - 1); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java new file mode 100644 index 0000000..a1b79b0 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/ThriftConnectionFactory.java @@ -0,0 +1,369 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.twitter.common.base.Closure; +import com.twitter.common.base.Closures; +import com.twitter.common.base.MorePreconditions; +import com.twitter.common.net.pool.Connection; +import com.twitter.common.net.pool.ConnectionFactory; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; + +/** + * A connection factory for thrift transport connections to a given host. This connection factory + * is lazy and will only create a configured maximum number of active connections - where a + * {@link ConnectionFactory#create(com.twitter.common.quantity.Amount) created} connection that has + * not been {@link #destroy destroyed} is considered active. + * + * @author John Sirois + */ +public class ThriftConnectionFactory + implements ConnectionFactory<Connection<TTransport, InetSocketAddress>> { + + public enum TransportType { + BLOCKING, FRAMED, NONBLOCKING; + + /** + * Async clients implicitly use a framed transport, requiring the server they connect to to do + * the same. This prevents specifying a nonblocking client without a framed transport, since + * that is not compatible with thrift and would simply cause the client to blow up when making a + * request. Instead, you must explicitly say useFramedTransport(true) for any buildAsync(). + */ + public static TransportType get(boolean framedTransport, boolean nonblocking) { + if (nonblocking) { + Preconditions.checkArgument(framedTransport, + "nonblocking client requires a server running framed transport"); + return NONBLOCKING; + } + + return framedTransport ? FRAMED : BLOCKING; + } + } + + private static InetSocketAddress asEndpoint(String host, int port) { + MorePreconditions.checkNotBlank(host); + Preconditions.checkArgument(port > 0); + return InetSocketAddress.createUnresolved(host, port); + } + + private InetSocketAddress endpoint; + private final int maxConnections; + private final TransportType transportType; + private final Amount<Long, Time> socketTimeout; + private final Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback; + private boolean sslTransport = false; + + private final Set<Connection<TTransport, InetSocketAddress>> activeConnections = + Sets.newSetFromMap( + Maps.<Connection<TTransport, InetSocketAddress>, Boolean>newIdentityHashMap()); + private volatile int lastActiveConnectionsSize = 0; + + private final Lock activeConnectionsWriteLock = new ReentrantLock(true); + + /** + * Creates a thrift connection factory with a plain socket (non-framed transport). + * This is the same as calling {@link #ThriftConnectionFactory(String, int, int, boolean)} with + * {@code framedTransport} set to {@code false}. + * + * @param host Host to connect to. + * @param port Port to connect on. + * @param maxConnections Maximum number of connections for this host:port. + */ + public ThriftConnectionFactory(String host, int port, int maxConnections) { + this(host, port, maxConnections, TransportType.BLOCKING); + } + + /** + * Creates a thrift connection factory. + * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, + * otherwise a raw {@link TSocket} will be used. + * + * @param host Host to connect to. + * @param port Port to connect on. + * @param maxConnections Maximum number of connections for this host:port. + * @param framedTransport Whether to use framed or blocking transport. + */ + public ThriftConnectionFactory(String host, int port, int maxConnections, + boolean framedTransport) { + + this(asEndpoint(host, port), maxConnections, TransportType.get(framedTransport, false)); + } + + /** + * Creates a thrift connection factory. + * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, + * otherwise a raw {@link TSocket} will be used. + * + * @param endpoint Endpoint to connect to. + * @param maxConnections Maximum number of connections for this host:port. + * @param framedTransport Whether to use framed or blocking transport. + */ + public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, + boolean framedTransport) { + + this(endpoint, maxConnections, TransportType.get(framedTransport, false)); + } + + /** + * Creates a thrift connection factory. + * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, + * otherwise a raw {@link TSocket} will be used. + * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, + * otherwise a raw {@link TSocket} will be used. + * Timeouts are ignored when nonblocking transport is used. + * + * @param host Host to connect to. + * @param port Port to connect on. + * @param maxConnections Maximum number of connections for this host:port. + * @param transportType Whether to use normal blocking, framed blocking, or non-blocking + * (implicitly framed) transport. + */ + public ThriftConnectionFactory(String host, int port, int maxConnections, + TransportType transportType) { + this(host, port, maxConnections, transportType, null); + } + + /** + * Creates a thrift connection factory. + * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, + * otherwise a raw {@link TSocket} will be used. + * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, + * otherwise a raw {@link TSocket} will be used. + * Timeouts are ignored when nonblocking transport is used. + * + * @param host Host to connect to. + * @param port Port to connect on. + * @param maxConnections Maximum number of connections for this host:port. + * @param transportType Whether to use normal blocking, framed blocking, or non-blocking + * (implicitly framed) transport. + * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o + * the blocking client. + */ + public ThriftConnectionFactory(String host, int port, int maxConnections, + TransportType transportType, Amount<Long, Time> socketTimeout) { + this(asEndpoint(host, port), maxConnections, transportType, socketTimeout); + } + + public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, + TransportType transportType) { + this(endpoint, maxConnections, transportType, null); + } + + /** + * Creates a thrift connection factory. + * If {@code framedTransport} is set to {@code true}, {@link TFramedTransport} will be used, + * otherwise a raw {@link TSocket} will be used. + * If {@code nonblocking} is set to {@code true}, {@link TNonblockingSocket} will be used, + * otherwise a raw {@link TSocket} will be used. + * Timeouts are ignored when nonblocking transport is used. + * + * @param endpoint Endpoint to connect to. + * @param maxConnections Maximum number of connections for this host:port. + * @param transportType Whether to use normal blocking, framed blocking, or non-blocking + * (implicitly framed) transport. + * @param socketTimeout timeout on thrift i/o operations, or null to default to connectTimeout o + * the blocking client. + */ + public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, + TransportType transportType, Amount<Long, Time> socketTimeout) { + this(endpoint, maxConnections, transportType, socketTimeout, + Closures.<Connection<TTransport, InetSocketAddress>>noop(), false); + } + + public ThriftConnectionFactory(InetSocketAddress endpoint, int maxConnections, + TransportType transportType, Amount<Long, Time> socketTimeout, + Closure<Connection<TTransport, InetSocketAddress>> postCreateCallback, + boolean sslTransport) { + Preconditions.checkArgument(maxConnections > 0, "maxConnections must be at least 1"); + if (socketTimeout != null) { + Preconditions.checkArgument(socketTimeout.as(Time.MILLISECONDS) >= 0); + } + + this.endpoint = Preconditions.checkNotNull(endpoint); + this.maxConnections = maxConnections; + this.transportType = transportType; + this.socketTimeout = socketTimeout; + this.postCreateCallback = Preconditions.checkNotNull(postCreateCallback); + this.sslTransport = sslTransport; + } + + @Override + public boolean mightCreate() { + return lastActiveConnectionsSize < maxConnections; + } + + /** + * FIXME: shouldn't this throw TimeoutException instead of returning null + * in the timeout cases as per the ConnectionFactory.create javadoc? + */ + @Override + public Connection<TTransport, InetSocketAddress> create(Amount<Long, Time> timeout) + throws TTransportException, IOException { + + Preconditions.checkNotNull(timeout); + if (timeout.getValue() == 0) { + return create(); + } + + try { + long timeRemainingNs = timeout.as(Time.NANOSECONDS); + long start = System.nanoTime(); + if(activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) { + try { + if (!willCreateSafe()) { + return null; + } + + timeRemainingNs -= (System.nanoTime() - start); + + return createConnection((int) TimeUnit.NANOSECONDS.toMillis(timeRemainingNs)); + } finally { + activeConnectionsWriteLock.unlock(); + } + } else { + return null; + } + } catch (InterruptedException e) { + return null; + } + } + + private Connection<TTransport, InetSocketAddress> create() + throws TTransportException, IOException { + activeConnectionsWriteLock.lock(); + try { + if (!willCreateSafe()) { + return null; + } + + return createConnection(0); + } finally { + activeConnectionsWriteLock.unlock(); + } + } + + private Connection<TTransport, InetSocketAddress> createConnection(int timeoutMillis) + throws TTransportException, IOException { + TTransport transport = createTransport(timeoutMillis); + if (transport == null) { + return null; + } + + Connection<TTransport, InetSocketAddress> connection = + new TTransportConnection(transport, endpoint); + postCreateCallback.execute(connection); + activeConnections.add(connection); + lastActiveConnectionsSize = activeConnections.size(); + return connection; + } + + private boolean willCreateSafe() { + return activeConnections.size() < maxConnections; + } + + @VisibleForTesting + TTransport createTransport(int timeoutMillis) throws TTransportException, IOException { + TSocket socket = null; + if (transportType != TransportType.NONBLOCKING) { + // can't do a nonblocking create on a blocking transport + if (timeoutMillis <= 0) { + return null; + } + + if (sslTransport) { + SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory.getDefault(); + SSLSocket ssl_socket = (SSLSocket) factory.createSocket(endpoint.getHostName(), endpoint.getPort()); + ssl_socket.setSoTimeout(timeoutMillis); + return new TSocket(ssl_socket); + } else { + socket = new TSocket(endpoint.getHostName(), endpoint.getPort(), timeoutMillis); + } + } + + try { + switch (transportType) { + case BLOCKING: + socket.open(); + setSocketTimeout(socket); + return socket; + case FRAMED: + TFramedTransport transport = new TFramedTransport(socket); + transport.open(); + setSocketTimeout(socket); + return transport; + case NONBLOCKING: + try { + return new TNonblockingSocket(endpoint.getHostName(), endpoint.getPort()); + } catch (IOException e) { + throw new IOException("Failed to create non-blocking transport to " + endpoint, e); + } + } + } catch (TTransportException e) { + throw new TTransportException("Failed to create transport to " + endpoint, e); + } + + throw new IllegalArgumentException("unknown transport type " + transportType); + } + + private void setSocketTimeout(TSocket socket) { + if (socketTimeout != null) { + socket.setTimeout(socketTimeout.as(Time.MILLISECONDS).intValue()); + } + } + + @Override + public void destroy(Connection<TTransport, InetSocketAddress> connection) { + activeConnectionsWriteLock.lock(); + try { + boolean wasActiveConnection = activeConnections.remove(connection); + Preconditions.checkArgument(wasActiveConnection, + "connection %s not created by this factory", connection); + lastActiveConnectionsSize = activeConnections.size(); + } finally { + activeConnectionsWriteLock.unlock(); + } + + // We close the connection outside the critical section which means we may have more connections + // "active" (open) than maxConnections for a very short time + connection.close(); + } + + @Override + public String toString() { + return String.format("%s[%s]", getClass().getSimpleName(), endpoint); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/86a547b9/commons/src/main/java/com/twitter/common/thrift/ThriftException.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/com/twitter/common/thrift/ThriftException.java b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java new file mode 100644 index 0000000..b8e5949 --- /dev/null +++ b/commons/src/main/java/com/twitter/common/thrift/ThriftException.java @@ -0,0 +1,29 @@ +// ================================================================================================= +// Copyright 2011 Twitter, Inc. +// ------------------------------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this work except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file, or at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ================================================================================================= + +package com.twitter.common.thrift; + +/** + * Exception class to wrap exceptions caught during thrift calls. + */ +public class ThriftException extends Exception { + public ThriftException(String message) { + super(message); + } + public ThriftException(String message, Throwable t) { + super(message, t); + } +}
