http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java deleted file mode 100644 index 514e665..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/StatTrackingCaller.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import java.lang.reflect.Method; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.annotation.Nullable; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -import org.apache.aurora.common.thrift.TResourceExhaustedException; -import org.apache.aurora.common.thrift.TTimeoutException; -import org.apache.thrift.async.AsyncMethodCallback; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.common.stats.StatsProvider.RequestTimer; - -/** - * A caller that exports statistics about calls made to the wrapped caller. - * - * @author William Farner - */ -public class StatTrackingCaller extends CallerDecorator { - - private final StatsProvider statsProvider; - private final String serviceName; - - private final LoadingCache<Method, RequestTimer> stats = - CacheBuilder.newBuilder().build(new CacheLoader<Method, RequestTimer>() { - @Override public RequestTimer load(Method method) { - // Thrift does not support overloads - so just the name disambiguates all calls. - return statsProvider.makeRequestTimer(serviceName + "_" + method.getName()); - } - }); - - /** - * Creates a new stat tracking caller, which will export stats to the given {@link StatsProvider}. - * - * @param decoratedCaller The caller to decorate with a deadline. - * @param async Whether the caller is asynchronous. - * @param statsProvider The stat provider to export statistics to. - * @param serviceName The name of the service that methods are being called on. - */ - public StatTrackingCaller(Caller decoratedCaller, boolean async, StatsProvider statsProvider, - String serviceName) { - super(decoratedCaller, async); - - this.statsProvider = statsProvider; - this.serviceName = serviceName; - } - - @Override - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - final RequestTimer requestStats = stats.get(method); - final long startTime = System.nanoTime(); - - ResultCapture capture = new ResultCapture() { - @Override public void success() { - requestStats.requestComplete(TimeUnit.NANOSECONDS.toMicros( - System.nanoTime() - startTime)); - } - - @Override public boolean fail(Throwable t) { - // TODO(John Sirois): the ruby client reconnects for timeouts too - this provides a natural - // backoff mechanism - consider how to plumb something similar. - if (t instanceof TTimeoutException || t instanceof TimeoutException) { - requestStats.incTimeouts(); - return true; - } - - // TODO(John Sirois): consider ditching reconnects since its nearly redundant with errors as - // it stands. - if (!(t instanceof TResourceExhaustedException)) { - requestStats.incReconnects(); - } - // TODO(John Sirois): provide more detailed stats: track counts for distinct exceptions types, - // track retries-per-method, etc... - requestStats.incErrors(); - return true; - } - }; - - return invoke(method, args, callback, capture, connectTimeoutOverride); - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java b/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java deleted file mode 100644 index 4e62940..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/callers/ThriftCaller.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.callers; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import org.apache.aurora.common.net.pool.Connection; -import org.apache.aurora.common.net.pool.ObjectPool; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.net.pool.ResourceExhaustedException; -import org.apache.aurora.common.thrift.TResourceExhaustedException; -import org.apache.aurora.common.thrift.TTimeoutException; -import org.apache.aurora.common.net.loadbalancing.RequestTracker; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.transport.TTransport; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - -/** - * A caller that issues calls to a target that is assumed to be a client to a thrift service. - * - * @author William Farner - */ -public class ThriftCaller<T> implements Caller { - private static final Logger LOG = Logger.getLogger(ThriftCaller.class.getName()); - - private final ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool; - private final RequestTracker<InetSocketAddress> requestTracker; - private final Function<TTransport, T> clientFactory; - private final Amount<Long, Time> timeout; - private final boolean debug; - - /** - * Creates a new thrift caller. - * - * @param connectionPool The connection pool to use. - * @param requestTracker The request tracker to nofify of request results. - * @param clientFactory Factory to use for building client object instances. - * @param timeout The timeout to use when requesting objects from the connection pool. - * @param debug Whether to use the caller in debug mode. - */ - public ThriftCaller(ObjectPool<Connection<TTransport, InetSocketAddress>> connectionPool, - RequestTracker<InetSocketAddress> requestTracker, Function<TTransport, T> clientFactory, - Amount<Long, Time> timeout, boolean debug) { - - this.connectionPool = connectionPool; - this.requestTracker = requestTracker; - this.clientFactory = clientFactory; - this.timeout = timeout; - this.debug = debug; - } - - @Override - public Object call(Method method, Object[] args, @Nullable AsyncMethodCallback callback, - @Nullable Amount<Long, Time> connectTimeoutOverride) throws Throwable { - - final Connection<TTransport, InetSocketAddress> connection = getConnection(connectTimeoutOverride); - final long startNanos = System.nanoTime(); - - ResultCapture capture = new ResultCapture() { - @Override public void success() { - try { - requestTracker.requestResult(connection.getEndpoint(), - RequestTracker.RequestResult.SUCCESS, System.nanoTime() - startNanos); - } finally { - connectionPool.release(connection); - } - } - - @Override public boolean fail(Throwable t) { - if (debug) { - LOG.warning(String.format("Call to endpoint: %s failed: %s", connection, t)); - } - - try { - requestTracker.requestResult(connection.getEndpoint(), - RequestTracker.RequestResult.FAILED, System.nanoTime() - startNanos); - } finally { - connectionPool.remove(connection); - } - return true; - } - }; - - return invokeMethod(clientFactory.apply(connection.get()), method, args, callback, capture); - } - - private static Object invokeMethod(Object target, Method method, Object[] args, - AsyncMethodCallback callback, final ResultCapture capture) throws Throwable { - - // Swap the wrapped callback out for ours. - if (callback != null) { - callback = new WrappedMethodCallback(callback, capture); - - List<Object> argsList = Lists.newArrayList(args); - argsList.add(callback); - args = argsList.toArray(); - } - - try { - Object result = method.invoke(target, args); - if (callback == null) capture.success(); - - return result; - } catch (InvocationTargetException t) { - // We allow this one to go to both sync and async captures. - if (callback != null) { - callback.onError((Exception) t.getCause()); - return null; - } else { - capture.fail(t.getCause()); - throw t.getCause(); - } - } - } - - private Connection<TTransport, InetSocketAddress> getConnection( - Amount<Long, Time> connectTimeoutOverride) - throws TResourceExhaustedException, TTimeoutException { - try { - Connection<TTransport, InetSocketAddress> connection; - if (connectTimeoutOverride != null) { - connection = connectionPool.get(connectTimeoutOverride); - } else { - connection = (timeout.getValue() > 0) - ? connectionPool.get(timeout) : connectionPool.get(); - } - - if (connection == null) { - throw new TResourceExhaustedException("no connection was available"); - } - return connection; - } catch (ResourceExhaustedException e) { - throw new TResourceExhaustedException(e); - } catch (TimeoutException e) { - throw new TTimeoutException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java deleted file mode 100644 index a14f53a4..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredNonblockingServerSocket.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import org.apache.aurora.common.net.monitoring.ConnectionMonitor; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TTransportException; - -import java.net.InetSocketAddress; - -/** - * Extension of TNonblockingServerSocket that allows for tracking of connected clients. - * - * @author William Farner - */ -public class TMonitoredNonblockingServerSocket extends TNonblockingServerSocket { - private final ConnectionMonitor monitor; - - public TMonitoredNonblockingServerSocket(int port, ConnectionMonitor monitor) - throws TTransportException { - super(port); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(int port, int clientTimeout, ConnectionMonitor monitor) - throws TTransportException { - super(port, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, ConnectionMonitor monitor) - throws TTransportException { - super(bindAddr); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, - ConnectionMonitor monitor) throws TTransportException { - super(bindAddr, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - @Override - protected TNonblockingSocket acceptImpl() throws TTransportException { - /* TODO(William Farner): Finish implementing...may require an object proxy. - final TNonblockingSocket socket = super.acceptImpl(); - - TNonblockingSocket wrappedSocket = new TNonblockingSocket(socket.get) { - @Override public void close() { - super.close(); - monitor.disconnected(this); - } - }; - - monitor.connected(wrappedSocket, socket.getSocket().getInetAddress()); - - return wrappedSocket; - - */ - return super.acceptImpl(); - } - - @Override - public void close() { - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java deleted file mode 100644 index a0d7d5f..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredProcessor.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import org.apache.aurora.common.net.loadbalancing.RequestTracker; -import org.apache.thrift.TException; -import org.apache.thrift.TProcessor; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; - -import java.net.InetSocketAddress; - -import static org.apache.aurora.common.net.loadbalancing.RequestTracker.RequestResult.*; - -/** - * A TProcessor that joins a wrapped TProcessor with a monitor. - * - * @author William Farner - */ -public class TMonitoredProcessor implements TProcessor { - private final TProcessor wrapped; - private final TMonitoredServerSocket monitoredServerSocket; - private final RequestTracker<InetSocketAddress> monitor; - - public TMonitoredProcessor(TProcessor wrapped, TMonitoredServerSocket monitoredServerSocket, - RequestTracker<InetSocketAddress> monitor) { - this.wrapped = Preconditions.checkNotNull(wrapped); - this.monitoredServerSocket = Preconditions.checkNotNull(monitoredServerSocket); - this.monitor = Preconditions.checkNotNull(monitor); - } - - @Override - public boolean process(TProtocol in, TProtocol out) throws TException { - long startNanos = System.nanoTime(); - boolean exceptionThrown = false; - try { - return wrapped.process(in, out); - } catch (TException e) { - exceptionThrown = true; - throw e; - } finally { - InetSocketAddress address = monitoredServerSocket.getAddress((TSocket) in.getTransport()); - Preconditions.checkState(address != null, - "Address unknown for transport " + in.getTransport()); - - monitor.requestResult(address, exceptionThrown ? FAILED : SUCCESS, - System.nanoTime() - startNanos); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java deleted file mode 100644 index f4405c4..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/monitoring/TMonitoredServerSocket.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.monitoring; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import org.apache.aurora.common.net.monitoring.ConnectionMonitor; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransportException; - -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.Collections; -import java.util.Map; - -/** - * Extension of TServerSocket that allows for tracking of connected clients. - * - * @author William Farner - */ -public class TMonitoredServerSocket extends TServerSocket { - private ConnectionMonitor<InetSocketAddress> monitor; - - public TMonitoredServerSocket(ServerSocket serverSocket, - ConnectionMonitor<InetSocketAddress> monitor) { - super(serverSocket); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(ServerSocket serverSocket, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) { - super(serverSocket, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(int port, ConnectionMonitor<InetSocketAddress> monitor) - throws TTransportException { - super(port); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(int port, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(port, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(InetSocketAddress bindAddr, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(bindAddr); - this.monitor = Preconditions.checkNotNull(monitor); - } - - public TMonitoredServerSocket(InetSocketAddress bindAddr, int clientTimeout, - ConnectionMonitor<InetSocketAddress> monitor) throws TTransportException { - super(bindAddr, clientTimeout); - this.monitor = Preconditions.checkNotNull(monitor); - } - - private final Map<TSocket, InetSocketAddress> addressMap = - Collections.synchronizedMap(Maps.<TSocket, InetSocketAddress>newHashMap()); - - public InetSocketAddress getAddress(TSocket socket) { - return addressMap.get(socket); - } - - @Override - protected TSocket acceptImpl() throws TTransportException { - final TSocket socket = super.acceptImpl(); - final InetSocketAddress remoteAddress = - (InetSocketAddress) socket.getSocket().getRemoteSocketAddress(); - - TSocket monitoredSocket = new TSocket(socket.getSocket()) { - boolean closed = false; - - @Override public void close() { - try { - super.close(); - } finally { - if (!closed) { - monitor.released(remoteAddress); - addressMap.remove(this); - } - closed = true; - } - } - }; - - addressMap.put(monitoredSocket, remoteAddress); - - monitor.connected(remoteAddress); - return monitoredSocket; - } - - @Override - public void close() { - super.close(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java b/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java deleted file mode 100644 index 2cec711..0000000 --- a/commons/src/main/java/org/apache/aurora/common/thrift/testing/MockTSocket.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.thrift.testing; - -import org.apache.thrift.transport.TSocket; - -/** - * @author William Farner - */ -public class MockTSocket extends TSocket { - public static final String HOST = "dummyHost"; - public static final int PORT = 1000; - - private boolean connected = false; - - public MockTSocket() { - super(HOST, PORT); - } - - @Override - public void open() { - connected = true; - // TODO(William Farner): Allow for failure injection here by throwing TTransportException. - } - - @Override - public boolean isOpen() { - return connected; - } - - public void close() { - connected = false; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java b/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java index 377228f..1699416 100644 --- a/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java +++ b/commons/src/main/java/org/apache/aurora/common/thrift/testing/TestThriftTypes.java @@ -13,8 +13,12 @@ */ package org.apache.aurora.common.thrift.testing; +import java.util.Map; +import java.util.Map.Entry; + import com.google.common.base.Preconditions; import com.google.common.collect.Maps; + import org.apache.thrift.TBase; import org.apache.thrift.TBaseHelper; import org.apache.thrift.TException; @@ -24,9 +28,6 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TStruct; import org.apache.thrift.protocol.TType; -import java.util.Map; -import java.util.Map.Entry; - /** * Hand-coded thrift types for use in tests. * http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java b/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java deleted file mode 100644 index eb20925..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/DateUtils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.Calendar; -import java.util.Date; -import java.util.concurrent.TimeUnit; - -/** - * Utilities for working with java {@link Date}s. - * - * @author John Sirois - */ -public final class DateUtils { - - public static Date now() { - return new Date(); - } - - public static long toUnixTime(Date date) { - return toUnixTime(date.getTime()); - } - - public static long nowUnixTime() { - return toUnixTime(System.currentTimeMillis()); - } - - public static long toUnixTime(long millisSinceEpoch) { - return TimeUnit.MILLISECONDS.toSeconds(millisSinceEpoch); - } - - public static Date ago(int calendarField, int amount) { - return ago(now(), calendarField, amount); - } - - public static Date ago(Date referenceDate, int calendarField, int amount) { - Calendar calendar = Calendar.getInstance(); - calendar.setTime(referenceDate); - calendar.add(calendarField, -1 * amount); - return calendar.getTime(); - } - - private DateUtils() { - // utility - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java b/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java deleted file mode 100644 index 9b23ee0..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/FileUtils.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.io.File; - -/** - * Utilities for working with Files - * - * @author Florian Leibert - */ -public final class FileUtils { - - private FileUtils() { - } - - /** - * recursively deletes the path and all it's content and returns true if it succeeds - * Note that the content could be partially deleted and the method return false - * - * @param path the path to delete - * @return true if the path was deleted - */ - public static boolean forceDeletePath(File path) { - if (path == null) { - return false; - } - if (path.exists() && path.isDirectory()) { - File[] files = path.listFiles(); - for (File file : files) { - if (file.isDirectory()) { - forceDeletePath(file); - } else { - file.delete(); - } - } - } - return path.delete(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java b/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java deleted file mode 100644 index 69fb9ed..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/LowResClock.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import java.io.Closeable; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -/** - * Low resolution implementation of a {@link Clock}, - * optimized for fast reads at the expense of precision. - * It works by caching the result of the system clock for a - * {@code resolution} amount of time. - */ -public class LowResClock implements Clock, Closeable { - private static final ScheduledExecutorService GLOBAL_SCHEDULER = - Executors.newScheduledThreadPool(1, new ThreadFactory() { - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "LowResClock"); - t.setDaemon(true); - return t; - } - }); - - private volatile long time; - private final ScheduledFuture<?> updaterHandler; - private final Clock underlying; - - @VisibleForTesting - LowResClock(Amount<Long, Time> resolution, ScheduledExecutorService executor, Clock clock) { - long sleepTimeMs = resolution.as(Time.MILLISECONDS); - Preconditions.checkArgument(sleepTimeMs > 0); - underlying = clock; - Runnable ticker = new Runnable() { - @Override public void run() { - time = underlying.nowMillis(); - } - }; - - // Ensure the constructing thread sees a LowResClock with a valid (low-res) time by executing a - // blocking call now. - ticker.run(); - - updaterHandler = - executor.scheduleAtFixedRate(ticker, sleepTimeMs, sleepTimeMs, TimeUnit.MILLISECONDS); - } - - - /** - * Construct a LowResClock which wraps the system clock. - * This constructor will also schedule a periodic task responsible for - * updating the time every {@code resolution}. - */ - public LowResClock(Amount<Long, Time> resolution) { - this(resolution, GLOBAL_SCHEDULER, Clock.SYSTEM_CLOCK); - } - - /** - * Terminate the underlying updater task. - * Any subsequent usage of the clock will throw an {@link IllegalStateException}. - */ - public void close() { - updaterHandler.cancel(true); - } - - @Override - public long nowMillis() { - checkNotClosed(); - return time; - } - - @Override - public long nowNanos() { - return nowMillis() * 1000 * 1000; - } - - @Override - public void waitFor(long millis) throws InterruptedException { - checkNotClosed(); - underlying.waitFor(millis); - } - - private void checkNotClosed() { - if (updaterHandler.isCancelled()) { - throw new IllegalStateException("LowResClock invoked after being closed!"); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java b/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java deleted file mode 100644 index 0747e7a..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/ParsingUtil.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import com.google.common.base.Preconditions; - -import org.apache.aurora.common.collections.Pair; - -/** - * Common methods for parsing configs. - * - * @author John Sirois - */ -public class ParsingUtil { - /** - * Parses a string as a range between one integer and another. The integers must be separated by - * a hypen character (space padding is acceptable). Additionally, the first integer - * (left-hand side) must be less than or equal to the second (right-hand side). - * - * @param rangeString The string to parse as an integer range. - * @return A pair of the parsed integers. - */ - public static Pair<Integer, Integer> parseRange(String rangeString) { - if (rangeString == null) return null; - - String[] startEnd = rangeString.split("-"); - Preconditions.checkState( - startEnd.length == 2, "Shard range format: start-end (e.g. 1-4)"); - int start; - int end; - try { - start = Integer.parseInt(startEnd[0].trim()); - end = Integer.parseInt(startEnd[1].trim()); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Failed to parse shard range.", e); - } - - Preconditions.checkState( - start <= end, "The left-hand side of a shard range must be <= the right-hand side."); - return Pair.of(start, end); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java b/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java deleted file mode 100644 index 243cf33..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/QueueDrainer.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; - -import com.google.common.base.Preconditions; - -/** - * Joins a task queue with an executor service, to add control over when - * tasks are actually made available for execution. - * - * @author Srinivasan Rajagopal - */ -public class QueueDrainer<T extends Runnable> implements Runnable { - - private final Executor taskExecutor; - private final BlockingQueue<T> blockingQueue; - - /** - * Creates a QueueDrainer that associates the queue with an executorService. - * - * @param taskExecutor Executor to execute a task if present. - * @param blockingQueue Queue to poll if there is a runnable to execute. - */ - public QueueDrainer(Executor taskExecutor, BlockingQueue<T> blockingQueue) { - this.taskExecutor = Preconditions.checkNotNull(taskExecutor); - this.blockingQueue = Preconditions.checkNotNull(blockingQueue); - } - - /** - * Picks tasks from the Queue to execute if present else no-op. - */ - @Override - public void run() { - Runnable command = blockingQueue.poll(); - if (command != null) { - taskExecutor.execute(command); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java b/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java deleted file mode 100644 index 3e4de30..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/RangeNormalizer.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -//************************************************************************ -// -// Summize -// -// This work protected by US Copyright Law and contains proprietary and -// confidential trade secrets. -// -// (c) Copyright 2006 Summize, ALL RIGHTS RESERVED. -// -//************************************************************************ -package org.apache.aurora.common.util; - -/** - * Generic range normalizer class. Values must be positive. - * - * @author Abdur Chowdhury - */ -public class RangeNormalizer { - public RangeNormalizer(double minA, double maxA, double minB, double maxB) { - _minA = minA; - _maxA = maxA; - _minB = minB; - _maxB = maxB; - _denominator = (_maxA - _minA); - _B = (_maxB - _minB); - _midB = minB + (_B / 2f); - } - - public double normalize(double value) { - // if no input range, return a mid range value - if (_denominator == 0) { - return _midB; - } - - return ((value - _minA) / _denominator) * _B + _minB; - } - - public static double normalize(double value, double minA, double maxA, double minB, double maxB) { - // if the source min and max are equal, don't return 0, return something - // in the target range (perhaps this "default" should be another argument) - if (minA == maxA) { - return minB; - } - - return ((value - minA) / (maxA - minA)) * (maxB - minB) + minB; - } - - public static float normalizeToStepDistribution(double rating) { - int integerRating = (int) Math.round(rating); - - if (integerRating == 2) { - integerRating = 1; - } else if (integerRating == 4) { - integerRating = 3; - } else if (integerRating == 6) { - integerRating = 5; - } else if (integerRating == 8) { - integerRating = 7; - } else if (integerRating == 9) { - integerRating = 10; - } - - return (float) integerRating; - } - - // ******************************************************************* - private double _denominator; - private double _B; - private double _minA = Double.MIN_VALUE; - private double _maxA = Double.MAX_VALUE; - private double _minB = Double.MIN_VALUE; - private double _maxB = Double.MAX_VALUE; - private double _midB = Double.MAX_VALUE; -} - http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java b/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java deleted file mode 100644 index 05b3c5f..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/RateLimitedCommandExecutor.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * CommandExecutor that invokes {@code queueDrainer} with a best-effort - * mechanism to execute with a fixed interval between requests of {@code - * intervalBetweenRequests}. - * - * @author Srinivasan Rajagopal - */ -public class RateLimitedCommandExecutor implements CommandExecutor { - - private static final Logger LOG = Logger.getLogger(RateLimitedCommandExecutor.class.getName()); - - private final BlockingQueue<RetryingRunnable<?>> blockingQueue; - - /** - * Create a CommandExecutor that executes enquequed tasks in the task - * executor with specified interval between executions. - * - * @param taskExecutor executor for periodic execution of enqueued tasks. - * @param intervalBetweenRequests interval between requests to rate limit - * request rate. - * @param queueDrainer A runnable that is responsible for draining the queue. - * @param blockingQueue Queue to keep outstanding work in. - */ - public RateLimitedCommandExecutor( - ScheduledExecutorService taskExecutor, - Amount<Long, Time> intervalBetweenRequests, - Runnable queueDrainer, - BlockingQueue<RetryingRunnable<?>> blockingQueue) { - - checkNotNull(taskExecutor); - checkNotNull(intervalBetweenRequests); - checkArgument(intervalBetweenRequests.as(Time.MILLISECONDS) > 0); - checkNotNull(queueDrainer); - this.blockingQueue = checkNotNull(blockingQueue); - taskExecutor.scheduleWithFixedDelay( - getSafeRunner(queueDrainer), - 0, - intervalBetweenRequests.as(Time.MILLISECONDS), - TimeUnit.MILLISECONDS); - } - - private static Runnable getSafeRunner(final Runnable runnable) { - return new Runnable() { - @Override public void run() { - try { - runnable.run(); - } catch (RuntimeException t) { - LOG.log(Level.INFO, " error processing task " + runnable); - } - } - }; - } - - @Override - public <E extends Exception> void execute(String name, ExceptionalCommand<E> task, - Class<E> exceptionClass, int numTries, Amount<Long, Time> retryDelay) { - blockingQueue.add(new RetryingRunnable<E>(name, task, exceptionClass, - numTries, retryDelay, this)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java b/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java deleted file mode 100644 index 2f66d13..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/RetryingRunnable.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util; - -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.common.base.Throwables; - -import org.apache.commons.lang.builder.ToStringBuilder; - -import org.apache.aurora.common.base.ExceptionalCommand; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * A runnable task that is retried in a user-configurable fashion. - * - * @param <E> The type of exception that the ExceptionalCommand throws. - * - * @author Utkarsh Srivastava - */ -public class RetryingRunnable<E extends Exception> implements Runnable { - private final String name; - private final int tryNum; - private final int numTries; - private final Amount<Long, Time> retryDelay; - private final ExceptionalCommand<E> task; - private final CommandExecutor commandExecutor; - private final Class<E> exceptionClass; - - private static final Logger LOG = Logger.getLogger(RetryingRunnable.class.getName()); - - /** - * Create a Task with name {@code name} that executes at most {@code numTries} - * in case of failure with an interval of {@code retryDelay} between attempts. - * - * @param name Human readable name for this task. - * @param task the task to execute. - * @param exceptionClass class of the exception thrown by the task. - * @param numTries the total number of times to try. - * @param retryDelay the delay between successive tries. - * @param commandExecutor Executor to resubmit retries to. - * @param tryNum the seq number of this try. - */ - public RetryingRunnable( - String name, - ExceptionalCommand<E> task, - Class<E> exceptionClass, - int numTries, - Amount<Long, Time> retryDelay, - CommandExecutor commandExecutor, - int tryNum) { - - this.name = checkNotNull(name); - this.task = checkNotNull(task); - this.exceptionClass = checkNotNull(exceptionClass); - this.retryDelay = checkNotNull(retryDelay); - this.commandExecutor = checkNotNull(commandExecutor); - checkArgument(numTries > 0); - this.tryNum = tryNum; - this.numTries = numTries; - } - - /** - * Create a Task with name {@code name} that executes at most {@code numTries} - * in case of failure with an interval of {@code retryDelay} between attempts - * and sets tryNum to be the first (=1). - * - * @param name Human readable name for this task. - * @param task the task to execute. - * @param exceptionClass class of the exception thrown by the task. - * @param numTries the total number of times to try. - * @param retryDelay the delay between successive tries. - * @param commandExecutor Executor to resubmit retries to. - */ - public RetryingRunnable( - String name, - ExceptionalCommand<E> task, - Class<E> exceptionClass, - int numTries, - Amount<Long, Time> retryDelay, - CommandExecutor commandExecutor) { - - this(name, task, exceptionClass, numTries, retryDelay, commandExecutor, /*tryNum=*/ 1); - } - - @Override - public void run() { - try { - task.execute(); - } catch (Exception e) { - if (e.getClass().isAssignableFrom(exceptionClass)) { - if (tryNum < numTries) { - commandExecutor.execute(name, task, exceptionClass, numTries - 1, retryDelay); - } else { - LOG.log(Level.INFO, "Giving up on task: " + name + " " - + "after " + "trying " + numTries + " times" + ".", e); - } - } else { - LOG.log(Level.INFO, "Giving up on task: " + name + " after trying " - + numTries + " times. " + "due to unhandled exception ", e); - throw Throwables.propagate(e); - } - } - } - - @Override - public String toString() { - return new ToStringBuilder(this) - .append("name", name) - .append("tryNum", tryNum) - .append("numTries", numTries) - .append("retryDelay", retryDelay) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java b/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java deleted file mode 100644 index 72605ed..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/caching/Cache.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.caching; - -/** - * Definition of basic caching functionality. Cache keys and values are expected to always be - * valid, non-null values. - * - * @author William Farner - */ -public interface Cache<K, V> { - - /** - * Fetches a value from the cache. - * - * @param key The key for the value to fetch, must not be {@code null}. - * @return The cached value corresponding with {@code key}, or {@code null} if no entry exists. - */ - public V get(K key); - - /** - * Stores a key-value pair in the cache. - * - * @param key The key to store, must not be {@code null}. - * @param value The value to store, must not be {@code null}. - */ - public void put(K key, V value); - - /** - * Deletes an entry from the cache. - * - * @param key Key for the value to delete, must not be {@code null}. - */ - public void delete(K key); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java b/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java deleted file mode 100644 index 96a5377..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/caching/CachingMethodProxy.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.caching; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * A proxy class that handles caching of return values for method calls to a wrapped object. - * - * Example usage: - * - * Foo uncached = new Foo(); - * CachingMethodProxy<Foo> methodProxy = CachingMethodProxy.proxyFor(uncached, Foo.class); - * Foo foo = methodProxy.getCachingProxy(); - * methodProxy.cache(foo.doBar(), lruCache1) - * .cache(foo.doBaz(), lruCache2) - * .prepare(); - * - * @author William Farner - */ -public class CachingMethodProxy<T> { - - // Dummy return values to return when in recording state. - private static final Map<Class<?>, Object> EMPTY_RETURN_VALUES = - ImmutableMap.<Class<?>, Object>builder() - .put(Boolean.TYPE, Boolean.FALSE) - .put(Byte.TYPE, Byte.valueOf((byte) 0)) - .put(Short.TYPE, Short.valueOf((short) 0)) - .put(Character.TYPE, Character.valueOf((char)0)) - .put(Integer.TYPE, Integer.valueOf(0)) - .put(Long.TYPE, Long.valueOf(0)) - .put(Float.TYPE, Float.valueOf(0)) - .put(Double.TYPE, Double.valueOf(0)) - .build(); - private static final Map<Class<?>, Class<?>> AUTO_BOXING_MAP = - ImmutableMap.<Class<?>, Class<?>>builder() - .put(Boolean.TYPE, Boolean.class) - .put(Byte.TYPE, Byte.class) - .put(Short.TYPE, Short.class) - .put(Character.TYPE, Character.class) - .put(Integer.TYPE, Integer.class) - .put(Long.TYPE, Long.class) - .put(Float.TYPE, Float.class) - .put(Double.TYPE, Double.class) - .build(); - - // The uncached resource, whose method calls are deemed to be expensive and cacheable. - private final T uncached; - - // The methods that are cached, and the caches themselves. - private final Map<Method, MethodCache> methodCaches = Maps.newHashMap(); - private final Class<T> type; - - private Method lastMethodCall = null; - private boolean recordMode = true; - - /** - * Creates a new caching method proxy that will wrap an object and cache for the provided methods. - * - * @param uncached The uncached object that will be reverted to when a cache entry is not present. - */ - private CachingMethodProxy(T uncached, Class<T> type) { - this.uncached = Preconditions.checkNotNull(uncached); - this.type = Preconditions.checkNotNull(type); - Preconditions.checkArgument(type.isInterface(), "The proxied type must be an interface."); - } - - private static Object invokeMethod(Object subject, Method method, Object[] args) - throws Throwable { - try { - return method.invoke(subject, args); - } catch (IllegalAccessException e) { - throw new RuntimeException("Cannot access " + subject.getClass() + "." + method, e); - } catch (InvocationTargetException e) { - throw e.getCause(); - } - } - - /** - * A cached method and its caching control structures. - * - * @param <K> Cache key type. - * @param <V> Cache value type, expected to match the return type of the method. - */ - private static class MethodCache<K, V> { - private final Method method; - private final Cache<K, V> cache; - private final Function<Object[], K> keyBuilder; - private final Predicate<V> entryFilter; - - MethodCache(Method method, Cache<K, V> cache, Function<Object[], K> keyBuilder, - Predicate<V> entryFilter) { - this.method = method; - this.cache = cache; - this.keyBuilder = keyBuilder; - this.entryFilter = entryFilter; - } - - V doInvoke(Object uncached, Object[] args) throws Throwable { - K key = keyBuilder.apply(args); - - V cachedValue = cache.get(key); - - if (cachedValue != null) return cachedValue; - - Object fetched = invokeMethod(uncached, method, args); - - if (fetched == null) return null; - - @SuppressWarnings("unchecked") - V typedValue = (V) fetched; - - if (entryFilter.apply(typedValue)) cache.put(key, typedValue); - - return typedValue; - } - } - - /** - * Creates a new builder for the given type. - * - * @param uncached The uncached object that should be insulated by caching. - * @param type The interface that a proxy should be created for. - * @param <T> Type parameter to the proxied class. - * @return A new builder. - */ - public static <T> CachingMethodProxy<T> proxyFor(T uncached, Class<T> type) { - return new CachingMethodProxy<T>(uncached, type); - } - - @SuppressWarnings("unchecked") - public T getCachingProxy() { - return (T) Proxy.newProxyInstance(type.getClassLoader(), new Class[] { type }, - new InvocationHandler() { - @Override public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - return doInvoke(method, args); - } - }); - } - - private Object doInvoke(Method method, Object[] args) throws Throwable { - return recordMode ? recordCall(method) : cacheRequest(method, args); - } - - private Object recordCall(Method method) { - Preconditions.checkArgument(method.getReturnType() != Void.TYPE, - "Void return methods cannot be cached: " + method); - Preconditions.checkArgument(method.getParameterTypes().length > 0, - "Methods with zero arguments cannot be cached: " + method); - Preconditions.checkState(lastMethodCall == null, - "No cache instructions provided for call to: " + lastMethodCall); - - lastMethodCall = method; - - Class<?> returnType = method.getReturnType(); - return returnType.isPrimitive() ? EMPTY_RETURN_VALUES.get(returnType) : null; - } - - private Object cacheRequest(Method method, Object[] args) throws Throwable { - MethodCache cache = methodCaches.get(method); - - // Check if we are caching for this method. - if (cache == null) return invokeMethod(uncached, method, args); - - return cache.doInvoke(uncached, args); - } - - /** - * Instructs the proxy that cache setup is complete, and the proxy instance should begin caching - * and delegating uncached calls. After this is called, any subsequent calls to any of the - * cache setup methods will result in an {@link IllegalStateException}. - */ - public void prepare() { - Preconditions.checkState(!methodCaches.isEmpty(), "At least one method must be cached."); - Preconditions.checkState(recordMode, "prepare() may only be invoked once."); - - recordMode = false; - } - - public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache) { - return cache(value, cache, Predicates.<V>alwaysTrue()); - } - - public <V> CachingMethodProxy<T> cache(V value, Cache<List, V> cache, - Predicate<V> valueFilter) { - return cache(value, cache, DEFAULT_KEY_BUILDER, valueFilter); - } - - public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, - Function<Object[], K> keyBuilder) { - // Get the last method call and declare it the cached method. - return cache(value, cache, keyBuilder, Predicates.<V>alwaysTrue()); - } - - public <K, V> CachingMethodProxy<T> cache(V value, Cache<K, V> cache, - Function<Object[], K> keyBuilder, Predicate<V> valueFilter) { - Preconditions.checkNotNull(cache); - Preconditions.checkNotNull(keyBuilder); - Preconditions.checkNotNull(valueFilter); - - Preconditions.checkState(recordMode, "Cache setup is not allowed after prepare() is called."); - - // Get the last method call and declare it the cached method. - Preconditions.checkState(lastMethodCall != null, "No method call captured to be cached."); - - Class<?> returnType = lastMethodCall.getReturnType(); - - Preconditions.checkArgument(returnType != Void.TYPE, - "Cannot cache results from void method: " + lastMethodCall); - - if (returnType.isPrimitive()) { - // If a primitive type is returned, we need to make sure that the cache holds the boxed - // type for the primitive. - returnType = AUTO_BOXING_MAP.get(returnType); - } - - // TODO(William Farner): Figure out a simple way to make this possible. Right now, since the proxy - // objects return null, we get a null here and can't check the type. - //Preconditions.checkArgument(value.getClass() == returnType, - // String.format("Cache value type '%s' does not match method return type '%s'", - // value.getClass(), lastMethodCall.getReturnType())); - - methodCaches.put(lastMethodCall, new MethodCache<K, V>(lastMethodCall, cache, keyBuilder, - valueFilter)); - - lastMethodCall = null; - - return this; - } - - private static final Function<Object[], List> DEFAULT_KEY_BUILDER = - new Function<Object[], List>() { - @Override public List apply(Object[] args) { - return Arrays.asList(args); - } - }; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java b/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java deleted file mode 100644 index 65639e3..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/caching/LRUCache.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.caching; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.collections.Pair; -import org.apache.aurora.common.stats.Stats; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A cache with a fixed maximum size, evicting items that were used least-recently. - * WARNING: This is not thread-safe. If you wish to get a thread-safe version of a constructed - * LRUCache, you must wrap it with {@link Collections#synchronizedMap(java.util.Map)}. - * - * @author William Farner - */ -public class LRUCache<K, V> implements Cache<K, V> { - - private Map<K, V> map; - - private final AtomicLong accesses; - private final AtomicLong misses; - - /** - * Creates a new bounded cache with the given load factor. - * - * @param name Unique name for this cache. - * @param maxCapacity Maximum capacity for the cache, after which items will be evicted. - * @param loadFactor Load factor for the cache. - * @param makeSynchronized Whether the underlying map should be synchronized. - * @param evictionListener Listener to be notified when an element is evicted, or {@code null} if - * eviction notifications are not needed. - */ - private LRUCache(final String name, final int maxCapacity, float loadFactor, - boolean makeSynchronized, final Closure<Pair<K, V>> evictionListener) { - map = new LinkedHashMap<K, V>(maxCapacity, loadFactor, true /* Access order. */) { - @Override public boolean removeEldestEntry(Map.Entry<K, V> entry) { - boolean evict = size() > maxCapacity; - if (evict && evictionListener != null) { - evictionListener.execute(Pair.of(entry.getKey(), entry.getValue())); - } - return evict; - } - }; - - if (makeSynchronized) { - map = Collections.synchronizedMap(map); - } - - accesses = Stats.exportLong(name + "_lru_cache_accesses"); - misses = Stats.exportLong(name + "_lru_cache_misses"); - } - - public static <K, V> Builder<K, V> builder() { - return new Builder<K, V>(); - } - - public static class Builder<K, V> { - private String name = null; - - private int maxSize = 1000; - - // Sadly, LinkedHashMap doesn't expose this, so the default is pulled from the javadoc. - private float loadFactor = 0.75F; - - private boolean makeSynchronized = true; - - private Closure<Pair<K, V>> evictionListener = null; - - public Builder<K, V> name(String name) { - this.name = MorePreconditions.checkNotBlank(name); - return this; - } - - public Builder<K, V> maxSize(int maxSize) { - Preconditions.checkArgument(maxSize > 0); - this.maxSize = maxSize; - return this; - } - - public Builder<K, V> loadFactor(float loadFactor) { - this.loadFactor = loadFactor; - return this; - } - - public Builder<K, V> makeSynchronized(boolean makeSynchronized) { - this.makeSynchronized = makeSynchronized; - return this; - } - - public Builder<K, V> evictionListener(Closure<Pair<K, V>> evictionListener) { - this.evictionListener = evictionListener; - return this; - } - - public LRUCache<K, V> build() { - return new LRUCache<K, V>(name, maxSize, loadFactor, makeSynchronized, evictionListener); - } - } - - @Override - public V get(K key) { - accesses.incrementAndGet(); - V value = map.get(key); - if (value == null) { - misses.incrementAndGet(); - } - return value; - } - - @Override - public void put(K key, V value) { - map.put(key, value); - } - - @Override - public void delete(K key) { - map.remove(key); - } - - public int size() { - return map.size(); - } - - @Override - public String toString() { - return String.format("size: %d, accesses: %s, misses: %s", - map.size(), - accesses, - misses); - } - - public Collection<V> copyValues() { - synchronized(map) { - return ImmutableList.copyOf(map.values()); - } - } - - public long getAccesses() { - return accesses.longValue(); - } - - public long getMisses() { - return misses.longValue(); - } - - public double getHitRate() { - double numAccesses = accesses.longValue(); - return numAccesses == 0 ? 0 : (numAccesses - misses.longValue()) / numAccesses; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java deleted file mode 100644 index 2f963cf..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/concurrent/BackingOffFutureTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.concurrent; - -import com.google.common.base.Preconditions; -import org.apache.aurora.common.util.BackoffStrategy; - -import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A {@link RetryingFutureTask} that will resubmit itself to a work queue with a backoff. - * - * @author William Farner - */ -public class BackingOffFutureTask extends RetryingFutureTask { - private final ScheduledExecutorService executor; - private final BackoffStrategy backoffStrategy; - private long backoffMs = 0; - - /** - * Creates a new retrying future task that will execute a unit of work until successfully - * completed, or the retry limit has been reached. - * - * @param executor The executor service to resubmit the task to upon failure. - * @param callable The unit of work. The work is considered successful when {@code true} is - * returned. It may return {@code false} or throw an exception when - * unsueccessful. - * @param maxRetries The maximum number of times to retry the task. - * @param backoffStrategy Strategy to use for determining backoff duration. - */ - public BackingOffFutureTask(ScheduledExecutorService executor, Callable<Boolean> callable, - int maxRetries, BackoffStrategy backoffStrategy) { - super(executor, callable, maxRetries); - this.executor = executor; - this.backoffStrategy = Preconditions.checkNotNull(backoffStrategy); - } - - @Override - protected void retry() { - backoffMs = backoffStrategy.calculateBackoffMs(backoffMs); - executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java b/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java deleted file mode 100644 index 7448dc1..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/concurrent/RetryingFutureTask.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.concurrent; - -import com.google.common.base.Preconditions; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.FutureTask; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A future task that supports retries by resubmitting itself to an {@link ExecutorService}. - * - * @author William Farner - */ -public class RetryingFutureTask extends FutureTask<Boolean> { - private static Logger LOG = Logger.getLogger(RetryingFutureTask.class.getName()); - - protected final ExecutorService executor; - protected final int maxRetries; - protected int numRetries = 0; - protected final Callable<Boolean> callable; - - /** - * Creates a new retrying future task that will execute a unit of work until successfully - * completed, or the retry limit has been reached. - * - * @param executor The executor service to resubmit the task to upon failure. - * @param callable The unit of work. The work is considered successful when {@code true} is - * returned. It may return {@code false} or throw an exception when unsueccessful. - * @param maxRetries The maximum number of times to retry the task. - */ - public RetryingFutureTask(ExecutorService executor, Callable<Boolean> callable, int maxRetries) { - super(callable); - this.callable = Preconditions.checkNotNull(callable); - this.executor = Preconditions.checkNotNull(executor); - this.maxRetries = maxRetries; - } - - /** - * Invokes a retry of this task. - */ - protected void retry() { - executor.execute(this); - } - - @Override - public void run() { - boolean success = false; - try { - success = callable.call(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Exception while executing task.", e); - } - - if (!success) { - numRetries++; - if (numRetries > maxRetries) { - LOG.severe("Task did not complete after " + maxRetries + " retries, giving up."); - } else { - LOG.info("Task was not successful, resubmitting (num retries: " + numRetries + ")"); - retry(); - } - } else { - set(true); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java b/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java deleted file mode 100644 index 927fb2b..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/logging/ResourceLoggingConfigurator.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.logging; - -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.io.InputStream; -import java.util.logging.LogManager; - -/** - * A custom java.util.logging configuration class that loads the logging configuration from a - * properties file resource (as opposed to a file as natively supported by LogManager via - * java.util.logging.config.file). By default this configurator will look for the resource at - * /logging.properties but the resource path can be overridden by setting the system property with - * key {@link #LOGGING_PROPERTIES_RESOURCE_PATH java.util.logging.config.resource}. To install this - * configurator you must specify the following system property: - * java.util.logging.config.class=com.twitter.common.util.logging.ResourceLoggingConfigurator - * - * @author John Sirois - */ -public class ResourceLoggingConfigurator { - - /** - * A system property that controls where ResourceLoggingConfigurator looks for the logging - * configuration on the process classpath. - */ - public static final String LOGGING_PROPERTIES_RESOURCE_PATH = "java.util.logging.config.resource"; - - public ResourceLoggingConfigurator() throws IOException { - String loggingPropertiesResourcePath = - System.getProperty(LOGGING_PROPERTIES_RESOURCE_PATH, "/logging.properties"); - InputStream loggingConfig = getClass().getResourceAsStream(loggingPropertiesResourcePath); - Preconditions.checkNotNull(loggingConfig, - "Could not locate logging config file at resource path: %s", loggingPropertiesResourcePath); - LogManager.getLogManager().readConfiguration(loggingConfig); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java b/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java deleted file mode 100644 index 66bbb37..0000000 --- a/commons/src/main/java/org/apache/aurora/common/util/logging/UnresettableLogManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.util.logging; - -import java.util.logging.LogManager; - -/** - * A LogManager which by default ignores calls to {@link #reset()}. This is useful to avoid missing - * log statements that occur during vm shutdown. The standard LogManager installs a - * {@link Runtime#addShutdownHook(Thread) shutdown hook} that disables logging and this subclass - * nullifies that shutdown hook by disabling any reset of the LogManager by default. - * - * @author John Sirois - */ -public class UnresettableLogManager extends LogManager { - - /** - * The system property that controls which LogManager the java.util.logging subsystem should load. - */ - public static final String LOGGING_MANAGER = "java.util.logging.manager"; - - /** - * A system property which can be used to control an {@code UnresettableLogManager}'s behavior. - * If the UnresettableLogManager is installed, but an application still wants - * {@link LogManager#reset()} behavior, they can set this property to "false". - */ - private static final String LOGGING_MANAGER_IGNORERESET = "java.util.logging.manager.ignorereset"; - - @Override - public void reset() throws SecurityException { - if (Boolean.parseBoolean(System.getProperty(LOGGING_MANAGER_IGNORERESET, "true"))) { - System.err.println("UnresettableLogManager is ignoring a reset() request."); - } else { - super.reset(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/356eeac9/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java deleted file mode 100644 index 42732db..0000000 --- a/commons/src/main/java/org/apache/aurora/common/zookeeper/CompoundServerSet.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.zookeeper; - -import java.net.InetSocketAddress; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Joiner; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import org.apache.aurora.common.base.Command; -import org.apache.aurora.common.base.Commands; -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.zookeeper.Group.JoinException; -import org.apache.aurora.common.thrift.ServiceInstance; -import org.apache.aurora.common.thrift.Status; - -/** - * A ServerSet that delegates all calls to other ServerSets. - */ -public class CompoundServerSet implements ServerSet { - private static final Joiner STACK_TRACE_JOINER = Joiner.on('\n'); - - private final List<ServerSet> serverSets; - private final Map<ServerSet, ImmutableSet<ServiceInstance>> instanceCache = Maps.newHashMap(); - private final List<HostChangeMonitor<ServiceInstance>> monitors = Lists.newArrayList(); - private Command stopWatching = null; - private ImmutableSet<ServiceInstance> allHosts = ImmutableSet.of(); - - /** - * Create new ServerSet from a list of serverSets. - * - * @param serverSets serverSets to which the calls will be delegated. - */ - public CompoundServerSet(Iterable<ServerSet> serverSets) { - MorePreconditions.checkNotBlank(serverSets); - this.serverSets = ImmutableList.copyOf(serverSets); - } - - private interface JoinOp { - EndpointStatus doJoin(ServerSet serverSet) throws JoinException, InterruptedException; - } - - private interface StatusOp { - void changeStatus(EndpointStatus status) throws UpdateException; - } - - private void changeStatus( - ImmutableList<EndpointStatus> statuses, - StatusOp statusOp) throws UpdateException { - - ImmutableList.Builder<String> builder = ImmutableList.builder(); - int errorIdx = 1; - for (EndpointStatus endpointStatus : statuses) { - try { - statusOp.changeStatus(endpointStatus); - } catch (UpdateException exception) { - builder.add(String.format("[%d] %s", errorIdx++, - Throwables.getStackTraceAsString(exception))); - } - } - if (errorIdx > 1) { - throw new UpdateException( - "One or more ServerSet update failed: " + STACK_TRACE_JOINER.join(builder.build())); - } - } - - private EndpointStatus doJoin(JoinOp joiner) throws JoinException, InterruptedException { - // Get the list of endpoint status from the serverSets. - ImmutableList.Builder<EndpointStatus> builder = ImmutableList.builder(); - for (ServerSet serverSet : serverSets) { - builder.add(joiner.doJoin(serverSet)); - } - - final ImmutableList<EndpointStatus> statuses = builder.build(); - - return new EndpointStatus() { - @Override public void leave() throws UpdateException { - changeStatus(statuses, new StatusOp() { - @Override public void changeStatus(EndpointStatus status) throws UpdateException { - status.leave(); - } - }); - } - - @Override public void update(final Status newStatus) throws UpdateException { - changeStatus(statuses, new StatusOp() { - @Override public void changeStatus(EndpointStatus status) throws UpdateException { - status.update(newStatus); - } - }); - } - }; - } - - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints) - throws Group.JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - return serverSet.join(endpoint, additionalEndpoints); - } - }); - } - - /* - * If any one of the serverSet throws an exception during respective join, the exception is - * propagated. Join is successful only if all the joins are successful. - * - * NOTE: If an exception occurs during the join, the serverSets in the composite can be in a - * partially joined state. - * - * @see ServerSet#join(InetSocketAddress, Map, Status) - */ - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final Status status) throws Group.JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - - return serverSet.join(endpoint, additionalEndpoints, status); - } - }); - } - - @Override - public EndpointStatus join( - final InetSocketAddress endpoint, - final Map<String, InetSocketAddress> additionalEndpoints, - final int shardId) throws JoinException, InterruptedException { - - return doJoin(new JoinOp() { - @Override public EndpointStatus doJoin(ServerSet serverSet) - throws JoinException, InterruptedException { - - return serverSet.join(endpoint, additionalEndpoints, shardId); - } - }); - } - - // Handles changes to the union of hosts. - private synchronized void handleChange(ServerSet serverSet, ImmutableSet<ServiceInstance> hosts) { - instanceCache.put(serverSet, hosts); - - // Get the union of hosts. - ImmutableSet<ServiceInstance> currentHosts = - ImmutableSet.copyOf(Iterables.concat(instanceCache.values())); - - // Check if the hosts have changed. - if (!currentHosts.equals(allHosts)) { - allHosts = currentHosts; - - // Notify the monitors. - for (HostChangeMonitor<ServiceInstance> monitor : monitors) { - monitor.onChange(allHosts); - } - } - } - - /** - * Monitor the CompoundServerSet. - * - * If any one of the monitor calls to the underlying serverSet raises a MonitorException, the - * exception is propagated. The call is successful only if all the monitor calls to the - * underlying serverSets are successful. - * - * NOTE: If an exception occurs during the monitor call, the serverSets in the composite will not - * be monitored. - * - * @param monitor HostChangeMonitor instance used to monitor host changes. - * @return A command that, when executed, will stop monitoring all underlying server sets. - * @throws MonitorException If there was a problem monitoring any of the underlying server sets. - */ - @Override - public synchronized Command watch(HostChangeMonitor<ServiceInstance> monitor) - throws MonitorException { - if (stopWatching == null) { - monitors.add(monitor); - ImmutableList.Builder<Command> commandsBuilder = ImmutableList.builder(); - - for (final ServerSet serverSet : serverSets) { - commandsBuilder.add(serverSet.watch(new HostChangeMonitor<ServiceInstance>() { - @Override public void onChange(ImmutableSet<ServiceInstance> hostSet) { - handleChange(serverSet, hostSet); - } - })); - } - - stopWatching = Commands.compound(commandsBuilder.build()); - } - - return stopWatching; - } - - @Override - public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { - watch(monitor); - } -}
