Merge branch '1.6' into 1.7
Conflicts:
proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
server/master/src/main/java/org/apache/accumulo/master/Master.java
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/022225b8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/022225b8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/022225b8
Branch: refs/heads/master
Commit: 022225b853f4dafc4361aefbff9897f21e88e235
Parents: 1853d08 44b17c6
Author: Josh Elser <[email protected]>
Authored: Tue Dec 1 16:44:01 2015 -0500
Committer: Josh Elser <[email protected]>
Committed: Tue Dec 1 16:44:01 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/accumulo/proxy/Proxy.java | 3 +-
.../apache/accumulo/server/rpc/RpcWrapper.java | 79 +++++-
.../accumulo/server/rpc/RpcWrapperTest.java | 266 +++++++++++++++++++
.../accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../java/org/apache/accumulo/master/Master.java | 5 +-
.../apache/accumulo/tserver/TabletServer.java | 5 +-
6 files changed, 346 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
----------------------------------------------------------------------
diff --cc proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 1ce02e8,3368d20..5fa64e4
--- a/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/proxy/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@@ -23,26 -23,13 +23,27 @@@ import java.io.InputStream
import java.util.Properties;
import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
-import org.apache.accumulo.server.util.RpcWrapper;
-import org.apache.log4j.Logger;
+import org.apache.accumulo.server.metrics.MetricsFactory;
+import org.apache.accumulo.server.rpc.RpcWrapper;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.rpc.TimedProcessor;
+import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.thrift.TBaseProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@@ -161,113 -118,35 +162,113 @@@ public class Proxy implements KeywordEx
Class<? extends TProtocolFactory> protoFactoryClass =
Class.forName(opts.prop.getProperty("protocolFactory",
TCompactProtocol.Factory.class.getName()))
.asSubclass(TProtocolFactory.class);
+ TProtocolFactory protoFactory = protoFactoryClass.newInstance();
int port = Integer.parseInt(opts.prop.getProperty("port"));
- TServer server = createProxyServer(AccumuloProxy.class,
ProxyServer.class, port, protoFactoryClass, opts.prop);
- server.serve();
+ String hostname = opts.prop.getProperty(THRIFT_SERVER_HOSTNAME,
THRIFT_SERVER_HOSTNAME_DEFAULT);
+ HostAndPort address = HostAndPort.fromParts(hostname, port);
+ ServerAddress server = createProxyServer(address, protoFactory,
opts.prop);
+ // Wait for the server to come up
+ while (!server.server.isServing()) {
+ Thread.sleep(100);
+ }
+ log.info("Proxy server started on " + server.getAddress());
+ while (server.server.isServing()) {
+ Thread.sleep(1000);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new Proxy().execute(args);
+ }
+
+ public static ServerAddress createProxyServer(HostAndPort address,
TProtocolFactory protocolFactory, Properties properties) throws Exception {
+ return createProxyServer(address, protocolFactory, properties,
ClientConfiguration.loadDefault());
}
- public static TServer createProxyServer(Class<?> api, Class<?> implementor,
final int port, Class<? extends TProtocolFactory> protoClass,
- Properties properties) throws Exception {
- final TNonblockingServerSocket socket = new
TNonblockingServerSocket(port);
+ public static ServerAddress createProxyServer(HostAndPort address,
TProtocolFactory protocolFactory, Properties properties, ClientConfiguration
clientConf)
+ throws Exception {
+ final int numThreads =
Integer.parseInt(properties.getProperty(THRIFT_THREAD_POOL_SIZE_KEY,
THRIFT_THREAD_POOL_SIZE_DEFAULT));
+ final long maxFrameSize =
AccumuloConfiguration.getMemoryInBytes(properties.getProperty(THRIFT_MAX_FRAME_SIZE_KEY,
THRIFT_MAX_FRAME_SIZE_DEFAULT));
+ final int simpleTimerThreadpoolSize =
Integer.parseInt(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE.getDefaultValue());
+ // How frequently to try to resize the thread pool
+ final long threadpoolResizeInterval = 1000l * 5;
+ // No timeout
+ final long serverSocketTimeout = 0l;
+ // Use the new hadoop metrics2 support
+ final MetricsFactory metricsFactory = new MetricsFactory(false);
+ final String serverName = "Proxy", threadName = "Accumulo Thrift Proxy";
- // create the implementor
- Object impl =
implementor.getConstructor(Properties.class).newInstance(properties);
+ // create the implementation of the proxy interface
+ ProxyServer impl = new ProxyServer(properties);
- Class<?> proxyProcClass = Class.forName(api.getName() + "$Processor");
- Class<?> proxyIfaceClass = Class.forName(api.getName() + "$Iface");
+ // Wrap the implementation -- translate some exceptions
- AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl);
++ AccumuloProxy.Iface wrappedImpl = RpcWrapper.service(impl, new
AccumuloProxy.Processor<AccumuloProxy.Iface>(impl).getProcessMapView());
+
+ // Create the processor from the implementation
+ TProcessor processor = new
AccumuloProxy.Processor<AccumuloProxy.Iface>(wrappedImpl);
+
+ // Get the type of thrift server to instantiate
+ final String serverTypeStr = properties.getProperty(THRIFT_SERVER_TYPE,
THRIFT_SERVER_TYPE_DEFAULT);
+ ThriftServerType serverType = DEFAULT_SERVER_TYPE;
+ if (!THRIFT_SERVER_TYPE_DEFAULT.equals(serverTypeStr)) {
+ serverType = ThriftServerType.get(serverTypeStr);
+ }
+
+ SslConnectionParams sslParams = null;
+ SaslServerConnectionParams saslParams = null;
+ switch (serverType) {
+ case SSL:
+ sslParams =
SslConnectionParams.forClient(ClientContext.convertClientConfig(clientConf));
+ break;
+ case SASL:
+ if
(!clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(),
false)) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message
for slf4j capability
+ log.error("FATAL: SASL thrift server was requested but it is
disabled in client configuration");
+ throw new RuntimeException("SASL is not enabled in configuration");
+ }
+
+ // Kerberos needs to be enabled to use it
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message
for slf4j capability
+ log.error("FATAL: Hadoop security is not enabled");
+ throw new RuntimeException();
+ }
+
+ // Login via principal and keytab
+ final String kerberosPrincipal =
properties.getProperty(KERBEROS_PRINCIPAL, ""),
+ kerberosKeytab = properties.getProperty(KERBEROS_KEYTAB, "");
+ if (StringUtils.isBlank(kerberosPrincipal) ||
StringUtils.isBlank(kerberosKeytab)) {
+ // ACCUMULO-3651 Changed level to error and added FATAL to message
for slf4j capability
+ log.error("FATAL: Kerberos principal and keytab must be provided");
+ throw new RuntimeException();
+ }
+ UserGroupInformation.loginUserFromKeytab(kerberosPrincipal,
kerberosKeytab);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ log.info("Logged in as " + ugi.getUserName());
+
+ // The kerberosPrimary set in the SASL server needs to match the
principal we're logged in as.
+ final String shortName = ugi.getShortUserName();
+ log.info("Setting server primary to {}", shortName);
+ clientConf.setProperty(ClientProperty.KERBEROS_SERVER_PRIMARY,
shortName);
+
+ KerberosToken token = new KerberosToken();
+ saslParams = new SaslServerConnectionParams(clientConf, token, null);
+
+ processor = new UGIAssumingProcessor(processor);
+
+ break;
+ default:
+ // nothing to do -- no extra configuration necessary
+ break;
+ }
- @SuppressWarnings("unchecked")
- Constructor<? extends TProcessor> proxyProcConstructor = (Constructor<?
extends TProcessor>) proxyProcClass.getConstructor(proxyIfaceClass);
+ // Hook up support for tracing for thrift calls
+ TimedProcessor timedProcessor = new TimedProcessor(metricsFactory,
processor, serverName, threadName);
- @SuppressWarnings({"rawtypes", "unchecked"})
- final TProcessor processor =
proxyProcConstructor.newInstance(RpcWrapper.service(impl, ((TBaseProcessor)
proxyProcConstructor.newInstance(impl)).getProcessMapView()));
+ // Create the thrift server with our processor and properties
+ ServerAddress serverAddr = TServerUtils.startTServer(address, serverType,
timedProcessor, protocolFactory, serverName, threadName, numThreads,
+ simpleTimerThreadpoolSize, threadpoolResizeInterval, maxFrameSize,
sslParams, saslParams, serverSocketTimeout);
- THsHaServer.Args args = new THsHaServer.Args(socket);
- args.processor(processor);
- final long maxFrameSize =
AccumuloConfiguration.getMemoryInBytes(properties.getProperty("maxFrameSize",
"16M"));
- if (maxFrameSize > Integer.MAX_VALUE)
- throw new RuntimeException(maxFrameSize + " is larger than MAX_INT");
- args.transportFactory(new TFramedTransport.Factory((int) maxFrameSize));
- args.protocolFactory(protoClass.newInstance());
- args.maxReadBufferBytes = maxFrameSize;
- return new THsHaServer(args);
+ return serverAddr;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
----------------------------------------------------------------------
diff --cc
server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
index 62d39d2,0000000..585eb27
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/RpcWrapper.java
@@@ -1,65 -1,0 +1,128 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Map.Entry;
++import java.util.Set;
+
+import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
+import org.apache.accumulo.core.trace.wrappers.TraceWrap;
++import org.apache.thrift.ProcessFunction;
+import org.apache.thrift.TApplicationException;
++import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class accommodates the changes in THRIFT-1805, which appeared in
Thrift 0.9.1 and restricts client-side notification of server-side errors to
+ * {@link TException} only, by wrapping {@link RuntimeException} and {@link
Error} as {@link TException}, so it doesn't just close the connection and look
like
+ * a network issue, but informs the client that a {@link
TApplicationException} had occurred, as it did in Thrift 0.9.0. This performs
similar functions as
+ * {@link TraceWrap}, but with the additional action of translating
exceptions. See also ACCUMULO-1691 and ACCUMULO-2950.
+ *
++ * ACCUMULO-4065 found that the above exception-wrapping is not appropriate
for Thrift's implementation of oneway methods. Oneway methods are defined as a
++ * method which the client does not wait for it to return. Normally, this is
acceptable as these methods are void. Therefore, if another client reuses the
++ * connection to send a new RPC, there is no "extra" data sitting on the
InputStream from the Socket (that the server sent). However, the implementation
of a
++ * oneway method <em>does</em> send a response to the client when the
implementation throws a {@link TException}. This message showing up on the
client's
++ * InputStream causes future use of the Thrift Connection to become unusable.
As long as the Thrift implementation sends a message back when oneway methods
++ * throw a {@link TException}, we much make sure that we don't
re-wrap-and-throw any exceptions as {@link TException}s.
++ *
+ * @since 1.6.1
+ */
+public class RpcWrapper {
++ private static final Logger log = LoggerFactory.getLogger(RpcWrapper.class);
++
++ public static <T> T service(final T instance, @SuppressWarnings("rawtypes")
final Map<String,ProcessFunction<T,? extends TBase>> processorView) {
++ final Set<String> onewayMethods = getOnewayMethods(processorView);
++ log.debug("Found oneway Thrift methods: " + onewayMethods);
++
++ InvocationHandler handler = getInvocationHandler(instance, onewayMethods);
++
++ @SuppressWarnings("unchecked")
++ T proxiedInstance = (T)
Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
++ return proxiedInstance;
++ }
+
- public static <T> T service(final T instance) {
- InvocationHandler handler = new RpcServerInvocationHandler<T>(instance) {
++ protected static <T> RpcServerInvocationHandler<T>
getInvocationHandler(final T instance, final Set<String> onewayMethods) {
++ return new RpcServerInvocationHandler<T>(instance) {
+ private final Logger log = LoggerFactory.getLogger(instance.getClass());
+
+ @Override
+ public Object invoke(Object obj, Method method, Object[] args) throws
Throwable {
++ // e.g. ThriftClientHandler.flush(TInfo, TCredentials, ...)
+ try {
+ return super.invoke(obj, method, args);
+ } catch (RuntimeException e) {
+ String msg = e.getMessage();
- log.error("{}", msg, e);
++ log.error(msg, e);
++ if (onewayMethods.contains(method.getName())) {
++ throw e;
++ }
+ throw new TException(msg);
+ } catch (Error e) {
+ String msg = e.getMessage();
- log.error("{}", msg, e);
++ log.error(msg, e);
++ if (onewayMethods.contains(method.getName())) {
++ throw e;
++ }
+ throw new TException(msg);
+ }
+ }
+ };
-
- @SuppressWarnings("unchecked")
- T proxiedInstance = (T)
Proxy.newProxyInstance(instance.getClass().getClassLoader(),
instance.getClass().getInterfaces(), handler);
- return proxiedInstance;
+ }
+
++ protected static <T> Set<String>
getOnewayMethods(@SuppressWarnings("rawtypes") Map<String,ProcessFunction<T,?
extends TBase>> processorView) {
++ // Get a handle on the isOnewayMethod and make it accessible
++ final Method isOnewayMethod;
++ try {
++ isOnewayMethod = ProcessFunction.class.getDeclaredMethod("isOneway");
++ } catch (NoSuchMethodException e) {
++ throw new RuntimeException("Could not access isOneway method", e);
++ } catch (SecurityException e) {
++ throw new RuntimeException("Could not access isOneway method", e);
++ }
++ // In java7, this appears to be copying the method, but it's trivial for
us to return the object to how it was before.
++ final boolean accessible = isOnewayMethod.isAccessible();
++ isOnewayMethod.setAccessible(true);
++
++ try {
++ final Set<String> onewayMethods = new HashSet<String>();
++ for (@SuppressWarnings("rawtypes")
++ Entry<String,ProcessFunction<T,? extends TBase>> entry :
processorView.entrySet()) {
++ try {
++ if ((Boolean) isOnewayMethod.invoke(entry.getValue())) {
++ onewayMethods.add(entry.getKey());
++ }
++ } catch (RuntimeException e) {
++ throw e;
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ return onewayMethods;
++ } finally {
++ // Reset it back to how it was.
++ isOnewayMethod.setAccessible(accessible);
++ }
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
----------------------------------------------------------------------
diff --cc
server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
index 0000000,0000000..11e8031
new file mode 100644
--- /dev/null
+++
b/server/base/src/test/java/org/apache/accumulo/server/rpc/RpcWrapperTest.java
@@@ -1,0 -1,0 +1,266 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to you under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.server.rpc;
++
++import java.util.Collections;
++import java.util.HashMap;
++import java.util.Map;
++import java.util.Set;
++
++import org.apache.accumulo.core.trace.wrappers.RpcServerInvocationHandler;
++import org.apache.accumulo.server.rpc.RpcWrapper;
++import org.apache.thrift.ProcessFunction;
++import org.apache.thrift.TBase;
++import org.apache.thrift.TException;
++import org.apache.thrift.protocol.TProtocol;
++import org.junit.Assert;
++import org.junit.Test;
++
++import com.google.common.collect.Sets;
++
++/**
++ * Verification that RpcWrapper correctly mangles Exceptions to work around
Thrift.
++ */
++public class RpcWrapperTest {
++
++ private static final String RTE_MESSAGE = "RpcWrapperTest's
RuntimeException Message";
++
++ /**
++ * Given a method name and whether or not the method is oneway, construct a
ProcessFunction.
++ *
++ * @param methodName
++ * The service method name.
++ * @param isOneway
++ * Is the method oneway.
++ * @return A ProcessFunction.
++ */
++ private fake_proc<FakeService> createProcessFunction(String methodName,
boolean isOneway) {
++ return new fake_proc<FakeService>(methodName, isOneway);
++ }
++
++ @Test
++ public void testSomeOnewayMethods() {
++ @SuppressWarnings("rawtypes")
++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++ procs.put("foo", createProcessFunction("foo", true));
++ procs.put("foobar", createProcessFunction("foobar", false));
++ procs.put("bar", createProcessFunction("bar", true));
++ procs.put("barfoo", createProcessFunction("barfoo", false));
++
++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
++ Assert.assertEquals(Sets.newHashSet("foo", "bar"), onewayMethods);
++ }
++
++ @Test
++ public void testNoOnewayMethods() {
++ @SuppressWarnings("rawtypes")
++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++ procs.put("foo", createProcessFunction("foo", false));
++ procs.put("foobar", createProcessFunction("foobar", false));
++ procs.put("bar", createProcessFunction("bar", false));
++ procs.put("barfoo", createProcessFunction("barfoo", false));
++
++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
++ Assert.assertEquals(Collections.<String> emptySet(), onewayMethods);
++ }
++
++ @Test
++ public void testAllOnewayMethods() {
++ @SuppressWarnings("rawtypes")
++ Map<String,ProcessFunction<FakeService,? extends TBase>> procs = new
HashMap<String,ProcessFunction<FakeService,? extends TBase>>();
++ procs.put("foo", createProcessFunction("foo", true));
++ procs.put("foobar", createProcessFunction("foobar", true));
++ procs.put("bar", createProcessFunction("bar", true));
++ procs.put("barfoo", createProcessFunction("barfoo", true));
++
++ Set<String> onewayMethods = RpcWrapper.getOnewayMethods(procs);
++ Assert.assertEquals(Sets.newHashSet("foo", "foobar", "bar", "barfoo"),
onewayMethods);
++ }
++
++ @Test
++ public void testNoExceptionWrappingForOneway() throws Throwable {
++ final Object[] args = new Object[0];
++
++ final FakeService impl = new FakeServiceImpl();
++
++ // "short" names throw RTEs and are oneway, while long names do not throw
exceptions and are not oneway.
++ RpcServerInvocationHandler<FakeService> handler =
RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foo", "bar"));
++
++ // Should throw an exception, but not be wrapped because the method is
oneway
++ try {
++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
++ Assert.fail("Expected an exception");
++ } catch (RuntimeException e) {
++ Assert.assertEquals(RTE_MESSAGE, e.getMessage());
++ }
++
++ // Should not throw an exception
++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
++ }
++
++ @Test
++ public void testExceptionWrappingForNonOneway() throws Throwable {
++ final Object[] args = new Object[0];
++
++ final FakeService impl = new FakeServiceImpl();
++
++ // "short" names throw RTEs and are not oneway, while long names do not
throw exceptions and are oneway.
++ RpcServerInvocationHandler<FakeService> handler =
RpcWrapper.getInvocationHandler(impl, Sets.newHashSet("foobar", "barfoo"));
++
++ // Should throw an exception, but not be wrapped because the method is
oneway
++ try {
++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foo"), args);
++ Assert.fail("Expected an exception");
++ } catch (TException e) {
++ // The InvocationHandler should take the exception from the RTE and
make it a TException
++ Assert.assertEquals(RTE_MESSAGE, e.getMessage());
++ }
++
++ // Should not throw an exception
++ handler.invoke(impl, FakeServiceImpl.class.getMethod("foobar"), args);
++ }
++
++ //
++ // Some hacked together classes/interfaces that mimic what Thrift is doing.
++ //
++
++ /**
++ * Some fake fields for our fake arguments.
++ */
++ private static class fake_fields implements org.apache.thrift.TFieldIdEnum {
++ @Override
++ public short getThriftFieldId() {
++ return 0;
++ }
++
++ @Override
++ public String getFieldName() {
++ return null;
++ }
++ }
++
++ /**
++ * A fake thrift service
++ */
++ interface FakeService {
++ void foo();
++
++ String foobar();
++
++ int bar();
++
++ long barfoo();
++ }
++
++ /**
++ * An implementation of the fake thrift service. The "short" names throw
RTEs, while long names do not.
++ */
++ public static class FakeServiceImpl implements FakeService {
++ @Override
++ public void foo() {
++ throw new RuntimeException(RTE_MESSAGE);
++ }
++
++ @Override
++ public String foobar() {
++ return "";
++ }
++
++ @Override
++ public int bar() {
++ throw new RuntimeException(RTE_MESSAGE);
++ }
++
++ @Override
++ public long barfoo() {
++ return 0;
++ }
++ };
++
++ /**
++ * A fake ProcessFunction implementation for testing that allows injection
of method name and oneway.
++ */
++ private static class fake_proc<I extends FakeService> extends
org.apache.thrift.ProcessFunction<I,foo_args> {
++ final private boolean isOneway;
++
++ public fake_proc(String methodName, boolean isOneway) {
++ super(methodName);
++ this.isOneway = isOneway;
++ }
++
++ @Override
++ protected boolean isOneway() {
++ return isOneway;
++ }
++
++ @SuppressWarnings("rawtypes")
++ @Override
++ public TBase getResult(I iface, foo_args args) throws TException {
++ return null;
++ }
++
++ @Override
++ public foo_args getEmptyArgsInstance() {
++ return null;
++ }
++ }
++
++ /**
++ * Fake arguments for our fake service.
++ */
++ private static class foo_args implements
org.apache.thrift.TBase<foo_args,fake_fields> {
++
++ private static final long serialVersionUID = 1L;
++
++ @Override
++ public int compareTo(foo_args o) {
++ return 0;
++ }
++
++ @Override
++ public void read(TProtocol iprot) throws TException {}
++
++ @Override
++ public void write(TProtocol oprot) throws TException {}
++
++ @Override
++ public fake_fields fieldForId(int fieldId) {
++ return null;
++ }
++
++ @Override
++ public boolean isSet(fake_fields field) {
++ return false;
++ }
++
++ @Override
++ public Object getFieldValue(fake_fields field) {
++ return null;
++ }
++
++ @Override
++ public void setFieldValue(fake_fields field, Object value) {}
++
++ @Override
++ public TBase<foo_args,fake_fields> deepCopy() {
++ return null;
++ }
++
++ @Override
++ public void clear() {}
++ }
++}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --cc
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 1daafcb,b4afda8..4d55461
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -709,16 -702,9 +709,16 @@@ public class SimpleGarbageCollector ext
}
private HostAndPort startStatsService() throws UnknownHostException {
- Iface rpcProxy = RpcWrapper.service(this);
- Processor<Iface> processor = new
Processor<Iface>(RpcWrapper.service(this, new
Processor<Iface>(this).getProcessMapView()));
- int port = config.getPort(Property.GC_PORT);
- long maxMessageSize =
config.getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
++ Iface rpcProxy = RpcWrapper.service(this, new
Processor<Iface>(this).getProcessMapView());
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcProxy = TCredentialsUpdatingWrapper.service(rpcProxy,
getClass(), getConfiguration());
+ processor = new Processor<Iface>(tcProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
+ int port = getConfiguration().getPort(Property.GC_PORT);
+ long maxMessageSize =
getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port);
log.debug("Starting garbage collector listening on " + result);
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/Master.java
index 3a28eeb,af481c8..02e1132
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@@ -1147,38 -1032,10 +1147,38 @@@ public class Master extends AccumuloSer
throw new IOException(e);
}
- Processor<Iface> processor = new Processor<Iface>(RpcWrapper.service(new
MasterClientServiceHandler(this),
- new Processor<Iface>(new
MasterClientServiceHandler(this)).getProcessMapView()));
- ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(),
hostname, Property.MASTER_CLIENTPORT, processor, "Master",
- "Master Client Service Handler", null, Property.MASTER_MINTHREADS,
Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+ ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
+
+ // Make sure that we have a secret key (either a new one or an old one
from ZK) before we start
+ // the master client service.
+ if (null != authenticationTokenKeyManager && null != keyDistributor) {
+ log.info("Starting delegation-token key manager");
+ keyDistributor.initialize();
+ authenticationTokenKeyManager.start();
+ boolean logged = false;
+ while (!authenticationTokenKeyManager.isInitialized()) {
+ // Print out a status message when we start waiting for the key
manager to get initialized
+ if (!logged) {
+ log.info("Waiting for AuthenticationTokenKeyManager to be
initialized");
+ logged = true;
+ }
+ UtilWaitThread.sleep(200);
+ }
+ // And log when we are initialized
+ log.info("AuthenticationTokenSecretManager is initialized");
+ }
+
+ clientHandler = new MasterClientServiceHandler(this);
- Iface rpcProxy = RpcWrapper.service(clientHandler);
++ Iface rpcProxy = RpcWrapper.service(clientHandler, new
Processor<Iface>(clientHandler).getProcessMapView());
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy,
clientHandler.getClass(), getConfiguration());
+ processor = new Processor<Iface>(tcredsProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
+ ServerAddress sa = TServerUtils.startServer(this, hostname,
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service
Handler", null,
+ Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
clientService = sa.server;
String address = sa.address.toString();
log.info("Setting master lock data to " + address);
@@@ -1187,42 -1044,6 +1187,43 @@@
while (!clientService.isServing()) {
UtilWaitThread.sleep(100);
}
+
+ // Start the daemon to scan the replication table and make units of work
+ replicationWorkDriver = new ReplicationDriver(this);
+ replicationWorkDriver.start();
+
+ // Start the daemon to assign work to tservers to replicate to our peers
+ try {
+ replicationWorkAssigner = new WorkDriver(this);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ log.error("Caught exception trying to initialize replication
WorkDriver", e);
+ throw new RuntimeException(e);
+ }
+ replicationWorkAssigner.start();
+
+ // Start the replication coordinator which assigns tservers to service
replication requests
++ MasterReplicationCoordinator impl = new
MasterReplicationCoordinator(this);
+ ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>
replicationCoordinatorProcessor = new
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
- RpcWrapper.service(new MasterReplicationCoordinator(this)));
++ RpcWrapper.service(impl, new
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(impl).getProcessMapView()));
+ ServerAddress replAddress = TServerUtils.startServer(this, hostname,
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
+ "Master Replication Coordinator", "Replication Coordinator", null,
Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+ Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
+
+ log.info("Started replication coordinator service at " +
replAddress.address);
+
+ // Advertise that port we used so peers don't have to be told what it is
+
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance())
+ Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
+ replAddress.address.toString().getBytes(UTF_8),
NodeExistsPolicy.OVERWRITE);
+
+ // Register replication metrics
+ MasterMetricsFactory factory = new
MasterMetricsFactory(getConfiguration(), this);
+ Metrics replicationMetrics = factory.createReplicationMetrics();
+ try {
+ replicationMetrics.register();
+ } catch (Exception e) {
+ log.error("Failed to register replication metrics", e);
+ }
+
while (clientService.isServing()) {
UtilWaitThread.sleep(500);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/022225b8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --cc
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 034cb16,651df66..3022a76
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -2331,46 -3159,15 +2331,47 @@@ public class TabletServer extends Accum
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
- Iface rpcProxy = RpcWrapper.service(new ThriftClientHandler());
+ ThriftClientHandler handler = new ThriftClientHandler();
- Iface tch = RpcWrapper.service(handler, new
Processor<Iface>(handler).getProcessMapView());
- Processor<Iface> processor = new Processor<Iface>(tch);
- HostAndPort address = startServer(getSystemConfiguration(),
clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor, "Thrift
Client Server");
++ Iface rpcProxy = RpcWrapper.service(handler, new
Processor<Iface>(handler).getProcessMapView());
+ final Processor<Iface> processor;
+ if (ThriftServerType.SASL == getThriftServerType()) {
+ Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy,
ThriftClientHandler.class, getConfiguration());
+ processor = new Processor<Iface>(tcredProxy);
+ } else {
+ processor = new Processor<Iface>(rpcProxy);
+ }
+ HostAndPort address =
startServer(getServerConfigurationFactory().getConfiguration(),
clientAddress.getHostText(), Property.TSERV_CLIENTPORT, processor,
+ "Thrift Client Server");
log.info("address = " + address);
return address;
}
- ZooLock getLock() {
+ private HostAndPort startReplicationService() throws UnknownHostException {
+ final ReplicationServicerHandler handler = new
ReplicationServicerHandler(this);
- ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler);
++ ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler, new
ReplicationServicer.Processor<ReplicationServicer.Iface>(handler).getProcessMapView());
+ ReplicationServicer.Iface repl =
TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(),
getConfiguration());
+ ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new
ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
+ AccumuloConfiguration conf =
getServerConfigurationFactory().getConfiguration();
+ Property maxMessageSizeProperty =
(conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ?
Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+ ServerAddress sp = TServerUtils.startServer(this,
clientAddress.getHostText(), Property.REPLICATION_RECEIPT_SERVICE_PORT,
processor,
+ "ReplicationServicerHandler", "Replication Servicer", null,
Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK,
maxMessageSizeProperty);
+ this.replServer = sp.server;
+ log.info("Started replication service on " + sp.address);
+
+ try {
+ // The replication service is unique to the thrift service for a
tserver, not just a host.
+ // Advertise the host and port for replication service given the host
and port for the tserver.
+
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance())
+ ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress.toString(),
+ sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+ } catch (Exception e) {
+ log.error("Could not advertise replication service port", e);
+ throw new RuntimeException(e);
+ }
+
+ return sp.address;
+ }
+
+ public ZooLock getLock() {
return tabletServerLock;
}