Repository: tajo Updated Branches: refs/heads/index_support 86c97b2a1 -> 42bcf2de0
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index be757af..84decd5 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -32,19 +32,18 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; -import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ProtoUtil; import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; @@ -53,7 +52,7 @@ import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProto public class SessionConnection implements Closeable { - private final Log LOG = LogFactory.getLog(TajoClientImpl.class); + private final static Log LOG = LogFactory.getLog(SessionConnection.class); final RpcClientManager manager; @@ -70,6 +69,8 @@ public class SessionConnection implements Closeable { private ServiceTracker serviceTracker; + private NettyClientBase client; + private KeyValueSet properties; /** @@ -88,27 +89,34 @@ public class SessionConnection implements Closeable { this.manager = RpcClientManager.getInstance(); this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); - this.manager.setTimeoutSeconds( - properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout - this.userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; this.serviceTracker = tracker; + try { + this.client = getTajoMasterConnection(); + } catch (ServiceException e) { + throw new IOException(e); + } } public Map<String, String> getClientSideSessionVars() { return Collections.unmodifiableMap(sessionVarsCache); } - public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, - ConnectException, ClassNotFoundException { - return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); - } - - public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) - throws NoSuchMethodException, ConnectException, ClassNotFoundException { - return manager.getClient(addr, protocolClass, asyncMode); + public synchronized NettyClientBase getTajoMasterConnection() throws ServiceException { + if (client != null && client.isConnected()) return client; + else { + try { + RpcClientManager.cleanup(client); + // Client do not closed on idle state for support high available + this.client = manager.newClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false, + manager.getRetries(), 0, TimeUnit.SECONDS, false); + } catch (Exception e) { + throw new ServiceException(e); + } + return client; + } } protected KeyValueSet getProperties() { @@ -129,10 +137,9 @@ public class SessionConnection implements Closeable { } public boolean isConnected() { - if(!closed.get()){ + if (!closed.get()) { try { - return manager.getClient(serviceTracker.getClientServiceAddress(), - TajoMasterClientProtocol.class, false).isConnected(); + return getTajoMasterConnection().isConnected(); } catch (Throwable e) { return false; } @@ -145,64 +152,51 @@ public class SessionConnection implements Closeable { } public String getCurrentDatabase() throws ServiceException { - return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public String call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getCurrentDatabase(null, sessionId).getValue(); - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getCurrentDatabase(null, sessionId).getValue(); } public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - KeyValueSet keyValueSet = new KeyValueSet(); - keyValueSet.putAll(variables); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .setSessionVars(keyValueSet.getProto()).build(); - - SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - - if (response.getResultCode() == ResultCode.OK) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw new ServiceException(response.getMessage()); - } - } - }.withRetries(); - } + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(variables); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); + SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .addAllUnsetVariables(variables).build(); + if (response.getResultCode() == ResultCode.OK) { + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); + } else { + throw new ServiceException(response.getMessage()); + } + } - SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); + public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException { + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - if (response.getResultCode() == ResultCode.OK) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw new ServiceException(response.getMessage()); - } - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .addAllUnsetVariables(variables).build(); + + SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); + + if (response.getResultCode() == ResultCode.OK) { + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); + } else { + throw new ServiceException(response.getMessage()); + } } void updateSessionVarsCache(Map<String, String> variables) { @@ -213,35 +207,26 @@ public class SessionConnection implements Closeable { } public String getSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public String call(NettyClientBase client) throws ServiceException { - - synchronized (sessionVarsCache) { - // If a desired variable is client side one and exists in the cache, immediately return the variable. - if (sessionVarsCache.containsKey(varname)) { - return sessionVarsCache.get(varname); - } - } + synchronized (sessionVarsCache) { + // If a desired variable is client side one and exists in the cache, immediately return the variable. + if (sessionVarsCache.containsKey(varname)) { + return sessionVarsCache.get(varname); + } + } - checkSessionAndGet(client); + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue(); - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue(); } public Boolean existSessionVariable(final String varname) throws ServiceException { - return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue(); - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue(); } public Map<String, String> getCachedAllSessionVariables() { @@ -251,29 +236,19 @@ public class SessionConnection implements Closeable { } public Map<String, String> getAllSessionVariables() throws ServiceException { - return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, - false) { - - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId)); - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId)); } public Boolean selectDatabase(final String databaseName) throws ServiceException { - Boolean selected = new ServerCallable<Boolean>(manager, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); - public Boolean call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue(); - } - }.withRetries(); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + boolean selected = tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue(); if (selected) { this.baseDatabase = databaseName; @@ -283,14 +258,14 @@ public class SessionConnection implements Closeable { @Override public void close() { - if(closed.getAndSet(true)){ + if (closed.getAndSet(true)) { return; } // remove session NettyClientBase client = null; try { - client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + client = getTajoMasterConnection(); TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); tajoMaster.removeSession(null, sessionId); } catch (Throwable e) { @@ -333,55 +308,51 @@ public class SessionConnection implements Closeable { } public boolean reconnect() throws Exception { - return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); - builder.setUsername(userInfo.getUserName()).build(); - if (baseDatabase != null) { - builder.setBaseDatabaseName(baseDatabase); - } + CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); + builder.setUsername(userInfo.getUserName()).build(); + if (baseDatabase != null) { + builder.setBaseDatabaseName(baseDatabase); + } + NettyClientBase client = getTajoMasterConnection(); - // create new session - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); - if (response.getResultCode() != ResultCode.OK) { - return false; - } + // create new session + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); + if (response.getResultCode() != ResultCode.OK) { + return false; + } - // Invalidate some session variables in client cache - sessionId = response.getSessionId(); - Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); - synchronized (sessionVarsCache) { - for (SessionVars var : UPDATE_ON_RECONNECT) { - String value = sessionVars.get(var.keyname()); - if (value != null) { - sessionVarsCache.put(var.keyname(), value); - } - } + // Invalidate some session variables in client cache + sessionId = response.getSessionId(); + Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); + synchronized (sessionVarsCache) { + for (SessionVars var : UPDATE_ON_RECONNECT) { + String value = sessionVars.get(var.keyname()); + if (value != null) { + sessionVarsCache.put(var.keyname(), value); } + } + } - // Update the session variables in server side - try { - KeyValueSet keyValueSet = new KeyValueSet(); - keyValueSet.putAll(sessionVarsCache); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .setSessionVars(keyValueSet.getProto()).build(); - - if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) { - tajoMasterService.removeSession(null, sessionId); - return false; - } - LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); - return true; - } catch (ServiceException e) { - tajoMasterService.removeSession(null, sessionId); - return false; - } + // Update the session variables in server side + try { + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(sessionVarsCache); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); + + if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) { + tajoMasterService.removeSession(null, sessionId); + return false; } - }.withRetries(); + LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); + return true; + } catch (ServiceException e) { + tajoMasterService.removeSession(null, sessionId); + return false; + } } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java index df709c5..0bb11e0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java @@ -88,7 +88,6 @@ public class TestDefaultCliOutputFormatter { String multiLineMessage = "ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" + "com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table: table1\n" + - "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n" + "\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" + "\tat org.apache.tajo.cli.tsql.commands.DescTableCommand.invoke(DescTableCommand.java:43)\n" + "\tat org.apache.tajo.cli.tsql.TajoCli.executeMetaCommand(TajoCli.java:300)\n" + @@ -96,9 +95,6 @@ public class TestDefaultCliOutputFormatter { "\tat org.apache.tajo.cli.tsql.TajoCli.runShell(TajoCli.java:271)\n" + "\tat org.apache.tajo.cli.tsql.TajoCli.main(TajoCli.java:420)\n" + "Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" + - "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" + - "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" + - "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" + "\t... 6 more"; assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage)); http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index b2e1ce9..b1a27fa 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -34,10 +34,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.master.event.QueryEvent; -import org.apache.tajo.master.event.QueryEventType; -import org.apache.tajo.master.event.StageEvent; -import org.apache.tajo.master.event.StageEventType; +import org.apache.tajo.master.event.*; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -131,8 +128,7 @@ public class TestKillQuery { assertNotNull(stage); // fire kill event - Query q = queryMasterTask.getQuery(); - q.handle(new QueryEvent(queryId, QueryEventType.KILL)); + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); try { cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); @@ -157,24 +153,55 @@ public class TestKillQuery { @Test public final void testIgnoreStageStateFromKilled() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); - QueryId queryId = new QueryId(res.getQueryId()); - cluster.waitForQuerySubmitted(queryId); + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(queryStr); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); - QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); - Query query = qmt.getQuery(); + CountDownLatch barrier = new CountDownLatch(1); + MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), dispatch); - // wait for a stage created - cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10); - query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); try{ - cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50); - } finally { - assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState()); + barrier.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + // fire kill event + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try { + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } finally { + queryMasterTask.stop(); } - List<Stage> stages = Lists.newArrayList(query.getStages()); + List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages()); Stage lastStage = stages.get(stages.size() - 1); assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 190beae..8f6f9ed 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -252,11 +252,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable @Override public void close() { - getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections"); - Channel channel = getChannel(); if (channel != null && channel.isOpen()) { LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); + /* channelInactive receives event and then client terminates all the requests */ channel.close().syncUninterruptibly(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java deleted file mode 100644 index 3c054ad..0000000 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import java.io.IOException; -import java.util.Date; -import java.util.List; - -public class RetriesExhaustedException extends RuntimeException { - private static final long serialVersionUID = 1876775844L; - - public RetriesExhaustedException(final String msg) { - super(msg); - } - - public RetriesExhaustedException(final String msg, final IOException e) { - super(msg, e); - } - - /** - * Datastructure that allows adding more info around Throwable incident. - */ - public static class ThrowableWithExtraContext { - private final Throwable t; - private final long when; - private final String extras; - - public ThrowableWithExtraContext(final Throwable t, final long when, - final String extras) { - this.t = t; - this.when = when; - this.extras = extras; - } - - @Override - public String toString() { - return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); - } - } - - /** - * Create a new RetriesExhaustedException from the list of prior failures. - * @param callableVitals Details from the {@link ServerCallable} we were using - * when we got this exception. - * @param numTries The number of tries we made - * @param exceptions List of exceptions that failed before giving up - */ - public RetriesExhaustedException(final String callableVitals, int numTries, - List<Throwable> exceptions) { - super(getMessage(callableVitals, numTries, exceptions)); - } - - /** - * Create a new RetriesExhaustedException from the list of prior failures. - * @param numTries - * @param exceptions List of exceptions that failed before giving up - */ - public RetriesExhaustedException(final int numTries, - final List<Throwable> exceptions) { - super(getMessage(numTries, exceptions)); - } - - private static String getMessage(String callableVitals, int numTries, - List<Throwable> exceptions) { - StringBuilder buffer = new StringBuilder("Failed contacting "); - buffer.append(callableVitals); - buffer.append(" after "); - buffer.append(numTries + 1); - buffer.append(" attempts.\nExceptions:\n"); - for (Throwable t : exceptions) { - buffer.append(t.toString()); - buffer.append("\n"); - } - return buffer.toString(); - } - - private static String getMessage(final int numTries, - final List<Throwable> exceptions) { - StringBuilder buffer = new StringBuilder("Failed after attempts="); - buffer.append(numTries + 1); - buffer.append(", exceptions:\n"); - for (Throwable t : exceptions) { - buffer.append(t.toString()); - buffer.append("\n"); - } - return buffer.toString(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java deleted file mode 100644 index 2804a03..0000000 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.ServiceException; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetSocketAddress; - -public abstract class ServerCallable<T> { - protected InetSocketAddress addr; - protected long startTime; - protected long endTime; - protected Class<?> protocol; - protected boolean asyncMode; - protected boolean closeConn; - protected RpcClientManager manager; - - public abstract T call(NettyClientBase client) throws Exception; - - public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol, - boolean asyncMode) { - this.manager = manager; - this.addr = addr; - this.protocol = protocol; - this.asyncMode = asyncMode; - } - - public void beforeCall() { - this.startTime = System.currentTimeMillis(); - } - - public long getStartTime(){ - return startTime; - } - - public void afterCall() { - this.endTime = System.currentTimeMillis(); - } - - public long getEndTime(){ - return endTime; - } - - boolean abort = false; - public void abort() { - abort = true; - } - /** - * Run this instance with retries, timed waits, - * and refinds of missing regions. - * - * @return an object of type T - * @throws com.google.protobuf.ServiceException if a remote or network exception occurs - */ - - public T withRetries() throws ServiceException { - //TODO configurable - final long pause = 500; //ms - final int numRetries = 3; - - for (int tries = 0; tries < numRetries; tries++) { - NettyClientBase client = null; - try { - beforeCall(); - if(addr != null) { - client = manager.getClient(addr, protocol, asyncMode); - } - return call(client); - } catch (IOException ioe) { - if(abort) { - throw new ServiceException(ioe.getMessage(), ioe); - } - if (tries == numRetries - 1) { - throw new ServiceException("Giving up after tries=" + tries, ioe); - } - } catch (Throwable t) { - throw new ServiceException(t); - } finally { - afterCall(); - if(closeConn) { - RpcClientManager.cleanup(client); - } - } - try { - Thread.sleep(pause * (tries + 1)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceException("Giving up after tries=" + tries, e); - } - } - return null; - } - - /** - * Run this instance against the server once. - * @return an object of type T - * @throws java.io.IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - public T withoutRetries() throws IOException, RuntimeException { - NettyClientBase client = null; - try { - beforeCall(); - client = manager.getClient(addr, protocol, asyncMode); - return call(client); - } catch (Throwable t) { - Throwable t2 = translateException(t); - if (t2 instanceof IOException) { - throw (IOException)t2; - } else { - throw new RuntimeException(t2); - } - } finally { - afterCall(); - if(closeConn) { - RpcClientManager.cleanup(client); - } - } - } - - private static Throwable translateException(Throwable t) throws IOException { - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); - } - if (t instanceof RemoteException && t.getCause() != null) { - t = t.getCause(); - } - return t; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 6f7fdd1..c86db80 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -170,45 +170,6 @@ public class TestBlockingRpc { } @Test - @SetupRpcConnection(setupRpcClient = false) - @Deprecated // serverCallable will be remove - public void testRpcWithServiceCallable() throws Exception { - RpcClientManager manager = RpcClientManager.getInstance(); - final SumRequest request = SumRequest.newBuilder() - .setX1(1) - .setX2(2) - .setX3(3.15d) - .setX4(2.0f).build(); - - SumResponse response = - new ServerCallable<SumResponse>(manager, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withRetries(); - - assertEquals(8.15d, response.getResult(), 1e-15); - - response = - new ServerCallable<SumResponse>(manager, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withoutRetries(); - - assertTrue(8.15d == response.getResult()); - RpcClientManager.close(); - } - - @Test public void testThrowException() throws Exception { EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build();
