TAJO-1860: Refactor Rpc clients to take Connection Parameters. Closes #763
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1eb10045 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1eb10045 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1eb10045 Branch: refs/heads/master Commit: 1eb10045908a8c5c6304e10379fa08a68bd61dad Parents: 5a15586 Author: Hyunsik Choi <[email protected]> Authored: Tue Sep 22 19:20:18 2015 -0700 Committer: Hyunsik Choi <[email protected]> Committed: Tue Sep 22 19:33:32 2015 -0700 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/catalog/CatalogClient.java | 14 +- .../tajo/cli/tsql/CliClientParamsFactory.java | 50 ++++++ .../java/org/apache/tajo/cli/tsql/TajoCli.java | 22 ++- .../tajo/client/ClientParameterHelper.java | 161 +++++++++++++++++++ .../apache/tajo/client/ClientParameters.java | 32 ++++ .../org/apache/tajo/client/QueryClientImpl.java | 14 +- .../apache/tajo/client/SessionConnection.java | 18 +-- .../tajo/client/v2/ClientDelegateFactory.java | 10 +- .../tajo/client/v2/LegacyClientDelegate.java | 13 +- .../org/apache/tajo/client/v2/TajoClient.java | 27 ++-- .../java/org/apache/tajo/conf/TajoConf.java | 4 + .../apache/tajo/exception/SQLExceptionUtil.java | 4 +- tajo-common/src/main/proto/errors.proto | 4 +- .../cli/tsql/TestDefaultCliOutputFormatter.java | 2 +- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 19 ++- .../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +- .../commands/TestExecExternalShellCommand.java | 2 +- .../tajo/cli/tsql/commands/TestHdfsCommand.java | 2 +- .../apache/tajo/util/TestRpcParamFactory.java | 58 +++++++ .../org/apache/tajo/master/QueryInProgress.java | 12 +- .../java/org/apache/tajo/master/TajoMaster.java | 7 - .../tajo/querymaster/DefaultTaskScheduler.java | 21 ++- .../apache/tajo/querymaster/QueryMaster.java | 20 +-- .../tajo/querymaster/QueryMasterTask.java | 11 +- .../java/org/apache/tajo/querymaster/Stage.java | 7 +- .../apache/tajo/util/RpcParameterFactory.java | 51 ++++++ .../tajo/worker/ExecutionBlockContext.java | 6 +- .../apache/tajo/worker/NodeStatusUpdater.java | 21 +-- .../java/org/apache/tajo/worker/TajoWorker.java | 5 - .../org/apache/tajo/worker/TaskManager.java | 12 +- .../ConnectivityCheckerRuleForTajoWorker.java | 4 +- tajo-docs/src/main/sphinx/jdbc_driver.rst | 43 +++++ .../sphinx/table_management/tablespaces.rst | 2 +- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 15 +- .../apache/tajo/jdbc/TestTajoJdbcNegative.java | 54 ++++++- .../java/org/apache/tajo/rpc/RpcConstants.java | 32 +++- .../org/apache/tajo/rpc/AsyncRpcClient.java | 43 +++-- .../org/apache/tajo/rpc/BlockingRpcClient.java | 45 +++--- .../org/apache/tajo/rpc/NettyClientBase.java | 68 +++++--- .../tajo/rpc/ProtoClientChannelInitializer.java | 25 +-- .../org/apache/tajo/rpc/RpcClientManager.java | 131 ++++----------- .../org/apache/tajo/rpc/RpcConnectionKey.java | 56 +++++++ .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 119 +++++++++----- .../org/apache/tajo/rpc/TestBlockingRpc.java | 115 +++++++++---- .../apache/tajo/rpc/TestRpcClientManager.java | 22 +-- 46 files changed, 1019 insertions(+), 388 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 63af5dd..cfad5fe 100644 --- a/CHANGES +++ b/CHANGES @@ -40,6 +40,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1860: Refactor Rpc clients to take Connection Parameters. (hyunsik) + TAJO-1868: Allow TablespaceManager::get to return a unregistered tablespace. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java index 80ded4a..2b24a6b 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java @@ -22,16 +22,14 @@ import com.google.protobuf.ServiceException; import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.rpc.*; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; +import java.util.Properties; /** * CatalogClient provides a client API to access the catalog server. @@ -71,10 +69,12 @@ public class CatalogClient extends AbstractCatalogClient { if (client != null && client.isConnected()) return client; RpcClientManager.cleanup(client); - int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES); + final Properties clientParams = new Properties(); + clientParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, conf.getVar(ConfVars.RPC_CLIENT_RETRY_NUM)); + // Client do not closed on idle state for support high available - this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false, - retry, 0, TimeUnit.SECONDS, false); + this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, + false, clientParams); } catch (Exception e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java new file mode 100644 index 0000000..5fabae8 --- /dev/null +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/CliClientParamsFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.cli.tsql; + +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.client.ClientParameters; + +import javax.validation.constraints.Null; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +class CliClientParamsFactory { + static Map<String, String> DEFAULT_PARAMS = new HashMap<>(); + + static { + // Keep lexicographic order of parameter names. + DEFAULT_PARAMS.put(ClientParameters.CONNECT_TIMEOUT, "0"); + DEFAULT_PARAMS.put(ClientParameters.SOCKET_TIMEOUT, "0"); + DEFAULT_PARAMS.put(ClientParameters.RETRY, "3"); + DEFAULT_PARAMS.put(ClientParameters.ROW_FETCH_SIZE, "200"); + DEFAULT_PARAMS.put(ClientParameters.USE_COMPRESSION, "false"); + } + + public static Properties get(@Nullable Properties connParam) { + Properties copy = connParam == null ? new Properties() : (Properties) connParam.clone(); + for (Map.Entry<String, String> entry : DEFAULT_PARAMS.entrySet()) { + if (!copy.contains(entry.getKey())) { + copy.setProperty(entry.getKey(), entry.getValue()); + } + } + return copy; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 83763e8..8ae7075 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -18,6 +18,7 @@ package org.apache.tajo.cli.tsql; +import com.google.common.collect.Maps; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; @@ -26,6 +27,7 @@ import jline.console.ConsoleReader; import org.apache.commons.cli.*; import org.apache.tajo.*; import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.cli.tsql.ParsedResult.StatementType; import org.apache.tajo.cli.tsql.SimpleParser.ParsingState; @@ -39,16 +41,14 @@ import org.apache.tajo.exception.TajoException; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ShutdownHookManager; import java.io.*; import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; public class TajoCli { public static final int SHUTDOWN_HOOK_PRIORITY = 50; @@ -176,7 +176,9 @@ public class TajoCli { } } - public TajoCli(TajoConf c, String [] args, InputStream in, OutputStream out) throws Exception { + public TajoCli(TajoConf c, String [] args, @Nullable Properties clientParams, InputStream in, OutputStream out) + throws Exception { + CommandLineParser parser = new PosixParser(); CommandLine cmd = parser.parse(options, args); @@ -234,14 +236,18 @@ public class TajoCli { } } + // Get connection parameters + Properties defaultConnParams = CliClientParamsFactory.get(clientParams); + final KeyValueSet actualConnParams = new KeyValueSet(Maps.fromProperties(defaultConnParams)); + if ((hostName == null) ^ (port == null)) { System.err.println(ERROR_PREFIX + "cannot find valid Tajo server address"); throw new RuntimeException("cannot find valid Tajo server address"); } else if (hostName != null && port != null) { conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port); - client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams); } else if (hostName == null && port == null) { - client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase, actualConnParams); } try { @@ -694,7 +700,7 @@ public class TajoCli { public static void main(String [] args) throws Exception { TajoConf conf = new TajoConf(); - TajoCli shell = new TajoCli(conf, args, System.in, System.out); + TajoCli shell = new TajoCli(conf, args, new Properties(), System.in, System.out); System.out.println(); System.exit(shell.runShell()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java new file mode 100644 index 0000000..8549178 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameterHelper.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import org.apache.tajo.SessionVars; +import org.apache.tajo.rpc.RpcConstants; +import org.apache.tajo.util.Pair; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.tajo.SessionVars.COMPRESSED_RESULT_TRANSFER; +import static org.apache.tajo.SessionVars.FETCH_ROWNUM; +import static org.apache.tajo.client.ClientParameterHelper.ActionType.CONNECTION_PARAM; +import static org.apache.tajo.client.ClientParameterHelper.ActionType.SESSION_UPDATE; +import static org.apache.tajo.rpc.RpcConstants.CLIENT_CONNECTION_TIMEOUT; +import static org.apache.tajo.rpc.RpcConstants.CLIENT_SOCKET_TIMEOUT; + +/** + * <ul> + * <li><code>useCompression=bool</code> - Enable compressed transfer for ResultSet. </li> + * <li><code>defaultRowFetchSize=int</code> - Determine the number of rows fetched in ResultSet by + * one fetch with trip to the Server.</li> + * <li><code>connectTimeout=int</code> - The timeout value used for socket connect operations. + * If connecting to the server takes longer than this value,the connection is broken. The + * timeout is specified in seconds and a value of zero means that it is disabled.</li> + * <li><code>socketTimeout=int</code></li> - The timeout value used for socket read operations. + * If reading from the server takes longer than this value, the connection is closed. + * This can be used as both a brute force global query timeout and a method of detecting + * network problems. The timeout is specified in seconds and a value of zero means that + * it is disabled.</li> + * <li><code>retry=int</code>Number of retry operation. Tajo JDBC driver is resilient + * against some network or connection problems. It determines how many times the connection will retry.</li> + * </ul> + */ +class ClientParameterHelper { + + public static Map<String, Action> PARAMETERS = new HashMap<>(); + + static { + PARAMETERS.put(ClientParameters.USE_COMPRESSION, new SimpleSessionAction(COMPRESSED_RESULT_TRANSFER)); + PARAMETERS.put(ClientParameters.ROW_FETCH_SIZE, new SimpleSessionAction(FETCH_ROWNUM)); + PARAMETERS.put(ClientParameters.CONNECT_TIMEOUT, new ConnectionParamAction() { + @Override + Pair<String, String> doAction(String param) { + int seconds = Integer.parseInt(param); + // convert seconds into mili seconds + return new Pair<>(CLIENT_CONNECTION_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(seconds))); + } + }); + PARAMETERS.put(ClientParameters.SOCKET_TIMEOUT, new ConnectionParamAction() { + @Override + Pair<String, String> doAction(String param) { + int seconds = Integer.parseInt(param); + return new Pair<>(CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(seconds))); + } + }); + PARAMETERS.put(ClientParameters.RETRY, new SimpleConnectionParamAction(RpcConstants.CLIENT_RETRY_NUM)); + } + + enum ActionType { + SESSION_UPDATE, + CONNECTION_PARAM + } + + interface Action { + ActionType type(); + } + + static class SimpleSessionAction extends SessionAction { + private final String sessionKey; + + SimpleSessionAction(SessionVars sessionVar) { + this.sessionKey = sessionVar.name(); + } + + Pair<String, String> doAction(String param) { + return new Pair<>(sessionKey, param); + } + } + + @SuppressWarnings("unused") + static abstract class SessionAction implements Action { + + @Override + public ActionType type() { + return SESSION_UPDATE; + } + + abstract Pair<String, String> doAction(String param); + } + + static class SimpleConnectionParamAction extends ConnectionParamAction { + final String connParamKey; + + SimpleConnectionParamAction(String connParamKey) { + this.connParamKey = connParamKey; + } + + public Pair<String, String> doAction(String param) { + return new Pair<>(connParamKey, param); + } + } + + @SuppressWarnings("unused") + static abstract class ConnectionParamAction implements Action { + + @Override + public ActionType type() { + return ActionType.CONNECTION_PARAM; + } + + abstract Pair<String, String> doAction(String param); + } + + public static Properties getConnParams(Collection<Map.Entry<String, String>> properties) { + Properties connParams = new Properties(); + for (Map.Entry<String, String> entry : properties) { + if(PARAMETERS.containsKey(entry.getKey()) && PARAMETERS.get(entry.getKey()).type() == CONNECTION_PARAM) { + Pair<String, String> keyValue = + ((ConnectionParamAction)PARAMETERS.get(entry.getKey())).doAction(entry.getValue()); + connParams.put(keyValue.getFirst(), keyValue.getSecond()); + } + } + + return connParams; + } + + public static Map<String, String> getSessionVars(Collection<Map.Entry<String, String>> properties) { + Map<String, String> sessionVars = new HashMap<>(); + + for (Map.Entry<String, String> entry : properties) { + if(PARAMETERS.containsKey(entry.getKey()) && PARAMETERS.get(entry.getKey()).type() == SESSION_UPDATE) { + Pair<String, String> keyValue = + ((SessionAction)PARAMETERS.get(entry.getKey())).doAction(entry.getValue()); + sessionVars.put(keyValue.getFirst(), keyValue.getSecond()); + } + } + + return sessionVars; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java new file mode 100644 index 0000000..e3ee019 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/ClientParameters.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +/** + * Client Parameters which can enable or disable some features of TajoClient. + * This class contains the parameter keys. In more detail, + * please refer to http://tajo.apache.org/docs/current/jdbc_driver.html##connection-parameters + */ +public interface ClientParameters { + String USE_COMPRESSION = "useCompression"; + String ROW_FETCH_SIZE = "defaultRowFetchSize"; + String CONNECT_TIMEOUT = "connectTimeout"; + String SOCKET_TIMEOUT = "socketTimeout"; + String RETRY = "retry"; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 144e3b6..364292e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -50,6 +50,7 @@ import java.net.InetSocketAddress; import java.sql.ResultSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -352,7 +353,7 @@ public class QueryClientImpl implements QueryClient { protected TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws TajoException { - boolean compress = conn.getProperties().getBool(SessionVars.COMPRESSED_RESULT_TRANSFER); + final boolean compress = conn.getProperties().getBool(ClientParameters.USE_COMPRESSION); final BlockingInterface stub = conn.getTMStub(); final GetQueryResultDataRequest.Builder request = GetQueryResultDataRequest.newBuilder(); @@ -544,16 +545,7 @@ public class QueryClientImpl implements QueryClient { try { - qmClient = manager.newClient( - qmAddress, - QueryMasterClientProtocol.class, - false, - manager.getRetries(), - manager.getTimeoutSeconds(), - TimeUnit.SECONDS, - false - ); - + qmClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false, new Properties()); conn.checkSessionAndGet(conn.getTajoMasterConnection()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 3307ade..f63bb47 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -40,7 +40,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NettyUtils; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse; @@ -51,10 +50,7 @@ import org.apache.tajo.util.ProtoUtil; import java.io.Closeable; import java.net.InetSocketAddress; import java.sql.SQLException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE; @@ -85,6 +81,8 @@ public class SessionConnection implements Closeable { private NettyClientBase client; + private Properties clientParams; + private final KeyValueSet properties; /** @@ -103,8 +101,9 @@ public class SessionConnection implements Closeable { this.properties = properties; this.manager = RpcClientManager.getInstance(); - this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); this.userInfo = UserRoleInfo.getCurrentUser(); + // update the connection parameters to RPC client from connection properties + this.clientParams = ClientParameterHelper.getConnParams(properties.getAllKeyValus().entrySet()); this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4); try { @@ -113,6 +112,9 @@ public class SessionConnection implements Closeable { NettyUtils.shutdown(eventLoopGroup); throw e; } + + // update the session variables from connection parameters + updateSessionVariables(ClientParameterHelper.getSessionVars(properties.getAllKeyValus().entrySet())); } public Map<String, String> getClientSideSessionVars() { @@ -130,7 +132,7 @@ public class SessionConnection implements Closeable { // Client do not closed on idle state for support high available this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, - manager.getRetries(), eventLoopGroup); + eventLoopGroup, clientParams); } catch (Throwable t) { throw new TajoRuntimeException(new ClientConnectionException(t)); } @@ -217,7 +219,6 @@ public class SessionConnection implements Closeable { ensureOk(response.getState()); updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - properties.putAll(sessionVarsCache); return Collections.unmodifiableMap(sessionVarsCache); } @@ -362,7 +363,6 @@ public class SessionConnection implements Closeable { } CreateSessionResponse response = null; - try { response = tajoMasterService.createSession(null, builder.build()); } catch (ServiceException se) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java index 44721b3..feee37b 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java @@ -21,22 +21,22 @@ package org.apache.tajo.client.v2; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.client.v2.exception.ClientUnableToConnectException; -import java.util.Map; +import java.util.Properties; public class ClientDelegateFactory { public static ClientDelegate newDefaultDelegate(String host, int port, - @Nullable Map<String, String> props) + @Nullable Properties clientParams) throws ClientUnableToConnectException { - return new LegacyClientDelegate(host, port, props); + return new LegacyClientDelegate(host, port, clientParams); } public static ClientDelegate newDefaultDelegate(ServiceDiscovery discovery, - @Nullable Map<String, String> props) + @Nullable Properties clientParams) throws ClientUnableToConnectException { - return new LegacyClientDelegate(discovery, props); + return new LegacyClientDelegate(discovery, clientParams); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java index 0a2c6de..889d974 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java @@ -18,6 +18,7 @@ package org.apache.tajo.client.v2; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.AbstractFuture; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; @@ -40,8 +41,10 @@ import org.apache.tajo.util.NetUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.sql.ResultSet; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -55,13 +58,15 @@ public class LegacyClientDelegate extends SessionConnection implements ClientDel private QueryClientImpl queryClient; private final ExecutorService executor = Executors.newFixedThreadPool(8); - public LegacyClientDelegate(String host, int port, Map<String, String> props) { - super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, new KeyValueSet(props)); + public LegacyClientDelegate(String host, int port, Properties clientParams) { + super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, + new KeyValueSet(clientParams == null ? new HashMap<String, String>() : Maps.fromProperties(clientParams))); queryClient = new QueryClientImpl(this); } - public LegacyClientDelegate(ServiceDiscovery discovery, Map<String, String> props) { - super(new DelegateServiceTracker(discovery), null, new KeyValueSet(props)); + public LegacyClientDelegate(ServiceDiscovery discovery, Properties clientParams) { + super(new DelegateServiceTracker(discovery), null, + new KeyValueSet(clientParams == null ? new HashMap<String, String>() : Maps.fromProperties(clientParams))); queryClient = new QueryClientImpl(this); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java index 2b4a150..dc81742 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java @@ -29,6 +29,7 @@ import java.io.Closeable; import java.io.IOException; import java.sql.ResultSet; import java.util.Map; +import java.util.Properties; public class TajoClient implements Closeable { private static Log LOG = LogFactory.getLog(TajoClient.class); @@ -52,11 +53,11 @@ public class TajoClient implements Closeable { /** * Initialize TajoClient with a hostname and default port 26002. * - * @param host Hostname to connect - * @param properties Connection properties + * @param host Hostname to connect + * @param clientParams Client connection parameters */ - public TajoClient(String host, Map<String, String> properties) throws ClientUnableToConnectException { - delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, properties); + public TajoClient(String host, Properties clientParams) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, clientParams); } /** @@ -72,12 +73,12 @@ public class TajoClient implements Closeable { /** * Initialize TajoClient with a hostname and port * - * @param host Hostname to connect - * @param port Port number to connect - * @param properties Connection properties + * @param host Hostname to connect + * @param port Port number to connect + * @param clientParams Client connection parameters */ - public TajoClient(String host, int port, Map<String, String> properties) throws ClientUnableToConnectException { - delegate = ClientDelegateFactory.newDefaultDelegate(host, port, properties); + public TajoClient(String host, int port, Properties clientParams) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, port, clientParams); } /** @@ -92,11 +93,11 @@ public class TajoClient implements Closeable { /** * Initialize TajoClient via service discovery protocol * - * @param discovery Service discovery - * @param properties Connection properties + * @param discovery Service discovery + * @param clientParams Client connection parameters */ - public TajoClient(ServiceDiscovery discovery, Map<String, String> properties) throws ClientUnableToConnectException { - delegate = ClientDelegateFactory.newDefaultDelegate(discovery, properties); + public TajoClient(ServiceDiscovery discovery, Properties clientParams) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(discovery, clientParams); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 3210323..d7789f8 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -230,6 +230,10 @@ public class TajoConf extends Configuration { // Internal RPC Client INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2), + RPC_CLIENT_RETRY_NUM("tajo.rpc.client.retry-num", 3, Validators.min("1")), + RPC_CLIENT_CONNECTION_TIMEOUT("tajo.rpc.client.connection-timeout-ms", (long)15 * 1000, Validators.min("0")), + RPC_CLIENT_SOCKET_TIMEOUT("tajo.rpc.client.socket-timeout-ms", (long)180 * 1000, Validators.min("0")), + RPC_CLIENT_HANG_DETECTION_ENABLED("tajo.rpc.client.hang-detection", true, Validators.bool()), // Internal RPC Server MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.rpc.server.worker-thread-num", http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java index 4b4ec97..bc40072 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java @@ -104,9 +104,9 @@ public class SQLExceptionUtil { SQLSTATES.put(ResultCode.INDETERMINATE_DATATYPE, "42P18"); // Client Connection - SQLSTATES.put(ResultCode.CLIENT_CONNECTION_EXCEPTION, "08001"); - SQLSTATES.put(ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "08002"); + SQLSTATES.put(ResultCode.CLIENT_CONNECTION_EXCEPTION, "08000"); SQLSTATES.put(ResultCode.CLIENT_CONNECTION_DOES_NOT_EXIST, "08003"); + SQLSTATES.put(ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "08006"); } public static boolean isThisError(SQLException e, ResultCode code) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-common/src/main/proto/errors.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/errors.proto b/tajo-common/src/main/proto/errors.proto index bb973f2..573fc7e 100644 --- a/tajo-common/src/main/proto/errors.proto +++ b/tajo-common/src/main/proto/errors.proto @@ -172,9 +172,9 @@ enum ResultCode { // Client Connection CLIENT_CONNECTION_EXCEPTION = 1101; // SQLState: 08000 - Client connection error - CLIENT_UNABLE_TO_ESTABLISH_CONNECTION = 1102; // SQLState: 08001 - CLIENT_CONNECTION_DOES_NOT_EXIST = 1103; // SQLState: 08003 - Client connection has been closed. - CLIENT_PROTOCOL_PROTOCOL_VIOLATION = 1104; // SQLState: ? + CLIENT_UNABLE_TO_ESTABLISH_CONNECTION = 1102; // SQLState: 08006 - Client connection failure + CLIENT_PROTOCOL_PROTOCOL_VIOLATION = 1104; // SQLState: 08P01 - Protocol violation // 53 - Invalid Operand or Inconsistent Specification INSUFFICIENT_RESOURCE = 53000; http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java index 3b53c60..cdcdb67 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java @@ -66,7 +66,7 @@ public class TestDefaultCliOutputFormatter { public void setUp() throws Exception { conf = cluster.getConfiguration(); ByteArrayOutputStream out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(conf, new String[]{}, System.in, out); + tajoCli = new TajoCli(conf, new String[]{}, null, System.in, out); cliContext = tajoCli.getContext(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 8ddef09..da51aed 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -29,9 +29,11 @@ import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.ClientParameters; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.FileUtil; @@ -43,6 +45,7 @@ import org.junit.rules.TestName; import java.io.*; import java.net.URL; +import java.util.Properties; import static org.junit.Assert.*; @@ -74,7 +77,9 @@ public class TestTajoCli { @Before public void setUp() throws Exception { out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(cluster.getConfiguration(), new String[]{}, System.in, out); + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, "3"); + tajoCli = new TajoCli(cluster.getConfiguration(), new String[]{}, connParams, System.in, out); } @After @@ -155,7 +160,7 @@ public class TestTajoCli { assertEquals("tajo.executor.join.inner.in-memory-table-num=256", confValues[1]); TajoConf tajoConf = TpchTestBase.getInstance().getTestingCluster().getConfiguration(); - TajoCli testCli = new TajoCli(tajoConf, args, System.in, System.out); + TajoCli testCli = new TajoCli(tajoConf, args, null, System.in, System.out); try { assertEquals("false", testCli.getContext().get(SessionVars.CLI_PAGING_ENABLED)); assertEquals("256", testCli.getContext().getConf().get("tajo.executor.join.inner.in-memory-table-num")); @@ -321,7 +326,7 @@ public class TestTajoCli { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); + TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, null, System.in, out); try { tajoCli.executeMetaCommand("\\getconf tajo.rootdir"); @@ -338,7 +343,7 @@ public class TestTajoCli { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); + TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, null, System.in, out); tajoCli.executeMetaCommand("\\admin -showmasters"); String consoleResult = new String(out.toByteArray()); @@ -372,7 +377,9 @@ public class TestTajoCli { TajoConf tajoConf = new TajoConf(); setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); - TajoCli tc = new TajoCli(tajoConf, new String[]{}, is, out); + Properties connParams = new Properties(); + connParams.setProperty(ClientParameters.RETRY, "3"); + TajoCli tc = new TajoCli(tajoConf, new String[]{}, connParams, is, out); tc.executeMetaCommand("\\set ON_ERROR_STOP false"); assertSessionVar(tc, SessionVars.ON_ERROR_STOP.keyname(), "false"); @@ -452,7 +459,7 @@ public class TestTajoCli { assertEquals(0L, tableDesc.getStats().getNumRows().longValue()); InputStream testInput = new ByteArrayInputStream(new byte[]{(byte) DefaultTajoCliOutputFormatter.QUIT_COMMAND}); - cli = new TajoCli(cluster.getConfiguration(), new String[]{}, testInput, out); + cli = new TajoCli(cluster.getConfiguration(), new String[]{}, null, testInput, out); setVar(cli, SessionVars.CLI_PAGE_ROWS, "2"); setVar(cli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java index 3ebbafa..d1f1023 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java @@ -40,7 +40,7 @@ public class TestTajoCliNegatives extends QueryTestCaseBase { @BeforeClass public static void setUp() throws Exception { out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(testingCluster.getConfiguration(), new String[]{}, System.in, out); + tajoCli = new TajoCli(testingCluster.getConfiguration(), new String[]{}, null, System.in, out); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java index ed7ee4a..95c3a8b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestExecExternalShellCommand.java @@ -34,7 +34,7 @@ public class TestExecExternalShellCommand { ByteArrayOutputStream out = new ByteArrayOutputStream(); - TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out); + TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, null, out); cli.executeMetaCommand("\\! echo \"this is test\""); String consoleResult = new String(out.toByteArray()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java index 496c7e3..d239c0a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/commands/TestHdfsCommand.java @@ -37,7 +37,7 @@ public class TestHdfsCommand { System.setOut(new PrintStream(out)); System.setErr(new PrintStream(out)); - TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, out); + TajoCli cli = new TajoCli(tajoConf, new String[]{}, null, null, out); cli.executeMetaCommand("\\dfs -test"); String consoleResult = new String(out.toByteArray()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java new file mode 100644 index 0000000..1d63bba --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/TestRpcParamFactory.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.rpc.RpcConstants; +import org.junit.Test; + +import java.util.Properties; + +import static org.apache.tajo.rpc.RpcConstants.CLIENT_CONNECTION_TIMEOUT; +import static org.apache.tajo.rpc.RpcConstants.CLIENT_RETRY_NUM; +import static org.junit.Assert.*; + +public class TestRpcParamFactory { + + @Test + public void testGetDefaults() throws Exception { + TajoConf conf = new TajoConf(); + Properties defaultParams = RpcParameterFactory.get(conf); + assertEquals( + ConfVars.RPC_CLIENT_RETRY_NUM.defaultVal, defaultParams.getProperty(CLIENT_RETRY_NUM)); + assertEquals( + ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT.defaultVal, defaultParams.getProperty(CLIENT_CONNECTION_TIMEOUT)); + assertEquals( + ConfVars.RPC_CLIENT_SOCKET_TIMEOUT.defaultVal, defaultParams.getProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT)); + } + + @Test + public void testGet() throws Exception { + TajoConf conf = new TajoConf(); + conf.setIntVar(ConfVars.RPC_CLIENT_RETRY_NUM, 100); + conf.setLongVar(ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT, (long)(10 * 1000)); + conf.setLongVar(ConfVars.RPC_CLIENT_SOCKET_TIMEOUT, (long)60 * 1000); + + Properties defaultParams = RpcParameterFactory.get(conf); + assertEquals("100", defaultParams.getProperty(CLIENT_RETRY_NUM)); + assertEquals(10 * 1000 + "", defaultParams.getProperty(CLIENT_CONNECTION_TIMEOUT)); + assertEquals(60 * 1000 + "", defaultParams.getProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index b848876..8e999c3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -39,9 +39,11 @@ import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.RpcParameterFactory; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -70,6 +72,8 @@ public class QueryInProgress { private AllocationResourceProto allocationResource; + private final Properties rpcParams; + private final Lock readLock; private final Lock writeLock; @@ -87,6 +91,8 @@ public class QueryInProgress { queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); queryInfo.setStartTime(System.currentTimeMillis()); + rpcParams = RpcParameterFactory.get(masterContext.getConf()); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); @@ -99,7 +105,7 @@ public class QueryInProgress { if (queryMasterRpcClient != null) { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture); - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); } } catch (Throwable e) { catchException("Failed to kill query " + queryId + " by exception " + e, e); @@ -182,7 +188,7 @@ public class QueryInProgress { InetSocketAddress addr = NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getQueryMasterPort()); LOG.info("Try to connect to QueryMaster:" + addr); - queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true); + queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true, rpcParams); queryMasterRpcClient = queryMasterRpc.getStub(); } @@ -216,7 +222,7 @@ public class QueryInProgress { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); querySubmitted = true; getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 2f37cee..922a5eb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -54,7 +54,6 @@ import org.apache.tajo.metrics.ClusterResourceMetricSet; import org.apache.tajo.metrics.Master; import org.apache.tajo.plan.function.python.PythonScriptEngine; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; @@ -172,14 +171,8 @@ public class TajoMaster extends CompositeService { context = new MasterContext(systemConf); clock = new SystemClock(); - RackResolver.init(systemConf); - RpcClientManager rpcManager = RpcClientManager.getInstance(); - rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); - rpcManager.setTimeoutSeconds( - systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); - initResourceManager(); this.dispatcher = new AsyncDispatcher(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index b65b5a9..25695a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -48,6 +48,7 @@ import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; @@ -69,6 +70,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private final TaskSchedulerContext context; private Stage stage; private TajoConf tajoConf; + private Properties rpcParams; private Thread schedulingThread; private volatile boolean isStopped; @@ -83,7 +85,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private int schedulerDelay; private int maximumRequestContainer; - //candidate workers for locality of high priority + // candidate workers for locality of high priority private Set<Integer> candidateWorkers = Sets.newHashSet(); public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { @@ -95,6 +97,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { @Override public void init(Configuration conf) { tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + rpcParams = RpcParameterFactory.get(new TajoConf()); + scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); schedulerDelay= tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_DELAY); @@ -294,7 +298,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { ServiceTracker serviceTracker = context.getMasterContext().getQueryMasterContext().getWorkerContext().getServiceTracker(); NettyClientBase tmClient = RpcClientManager.getInstance(). - getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true, rpcParams); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); CallFuture<NodeResourceResponse> callBack = new CallFuture<NodeResourceResponse>(); @@ -310,7 +314,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { .setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); - NodeResourceResponse response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); for (AllocationResourceProto resource : response.getResourceList()) { taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); @@ -886,12 +890,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { CallFuture<BatchAllocationResponse> callFuture = new CallFuture<BatchAllocationResponse>(); totalAttempts++; try { - tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true, + rpcParams); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); BatchAllocationResponse responseProto = - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); if (responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { @@ -1004,12 +1010,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { AsyncRpcClient tajoWorkerRpc; try { - tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true, + rpcParams); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); BatchAllocationResponse - responseProto = callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); if(responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index cce9482..1b90080 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -45,6 +45,7 @@ import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.HistoryWriter.WriterFuture; import org.apache.tajo.util.history.HistoryWriter.WriterHolder; @@ -52,10 +53,7 @@ import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -91,6 +89,8 @@ public class QueryMaster extends CompositeService implements EventHandler { private RpcClientManager manager; + private Properties rpcClientParams; + private ExecutorService eventExecutor; private ExecutorService singleEventExecutor; @@ -105,6 +105,7 @@ public class QueryMaster extends CompositeService implements EventHandler { this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); this.manager = RpcClientManager.getInstance(); + this.rpcClientParams = RpcParameterFactory.get(this.systemConf); querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); queryMasterContext = new QueryMasterContext(systemConf); @@ -171,7 +172,8 @@ public class QueryMaster extends CompositeService implements EventHandler { // update master address in worker context. ServiceTracker serviceTracker = workerContext.getServiceTracker(); - rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true, + rpcClientParams); QueryCoordinatorProtocolService masterService = rpc.getStub(); CallFuture<WorkerConnectionsResponse> callBack = new CallFuture<WorkerConnectionsResponse>(); @@ -179,7 +181,7 @@ public class QueryMaster extends CompositeService implements EventHandler { PrimitiveProtos.NullProto.getDefaultInstance(), callBack); WorkerConnectionsResponse connectionsProto = - callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); return connectionsProto.getWorkerList(); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -294,11 +296,11 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase tmClient; try { tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(), - QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocol.class, true, rpcClientParams); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); - future.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); } catch (Exception e) { //this function will be closed in new thread. //When tajo do stop cluster, tajo master maybe throw closed connection exception @@ -404,7 +406,7 @@ public class QueryMaster extends CompositeService implements EventHandler { ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), - QueryCoordinatorProtocol.class, true); + QueryCoordinatorProtocol.class, true, rpcClientParams); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); TajoHeartbeatRequest queryHeartbeat = buildTajoHeartBeat(eachTask); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 46e48e6..bcfb938 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -55,6 +55,7 @@ import org.apache.tajo.session.Session; import org.apache.tajo.storage.FormatProperty; import org.apache.tajo.storage.Tablespace; import org.apache.tajo.storage.TablespaceManager; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; import org.apache.tajo.worker.event.NodeResourceEvent; @@ -95,6 +96,8 @@ public class QueryMasterTask extends CompositeService { private TajoConf systemConf; + private Properties rpcParams; + private AtomicLong lastClientHeartbeat = new AtomicLong(-1); private volatile boolean isStopped; @@ -131,8 +134,8 @@ public class QueryMasterTask extends CompositeService { @Override public void serviceInit(Configuration conf) throws Exception { - systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + rpcParams = RpcParameterFactory.get(systemConf); queryTaskContext = new QueryMasterTaskContext(); @@ -255,7 +258,8 @@ public class QueryMasterTask extends CompositeService { InetSocketAddress workerAddress = getQuery().getStage(ebId).getAssignedWorkerMap().get(workerId); try { - tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true); + tajoWorkerRpc = RpcClientManager.getInstance().getClient(workerAddress, TajoWorkerProtocol.class, true, + rpcParams); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); CallFuture<PrimitiveProtos.BoolProto> callFuture = new CallFuture<PrimitiveProtos.BoolProto>(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), callFuture); @@ -472,7 +476,8 @@ public class QueryMasterTask extends CompositeService { @Override public void run() { try { - AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); + AsyncRpcClient rpc = RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, + rpcParams); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); tajoWorkerProtocolService.stopQuery(null, queryId.getProto(), NullCallback.get()); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 282edcc..98ad292 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -62,6 +62,7 @@ import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; @@ -88,6 +89,8 @@ public class Stage implements EventHandler<StageEvent> { private static final Log LOG = LogFactory.getLog(Stage.class); + private final Properties rpcParams; + private MasterPlan masterPlan; private ExecutionBlock block; private int priority; @@ -300,6 +303,8 @@ public class Stage implements EventHandler<StageEvent> { this.block = block; this.eventHandler = context.getEventHandler(); + this.rpcParams = RpcParameterFactory.get(context.getConf()); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); @@ -720,7 +725,7 @@ public class Stage implements EventHandler<StageEvent> { public void run() { try { AsyncRpcClient tajoWorkerRpc = - RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true); + RpcClientManager.getInstance().getClient(worker, TajoWorkerProtocol.class, true, rpcParams); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.stopExecutionBlock(null, requestProto, NullCallback.get(PrimitiveProtos.BoolProto.class)); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java b/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java new file mode 100644 index 0000000..6da4dac --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/RpcParameterFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.util; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.rpc.RpcConstants; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Helper class to get RPC Client Connection Parameters + */ +public class RpcParameterFactory { + + static final Map<String, ConfVars> PROPERTIES_MAP = new HashMap<>(); + + static { + PROPERTIES_MAP.put(RpcConstants.CLIENT_RETRY_NUM, ConfVars.RPC_CLIENT_RETRY_NUM); + PROPERTIES_MAP.put(RpcConstants.CLIENT_CONNECTION_TIMEOUT, ConfVars.RPC_CLIENT_CONNECTION_TIMEOUT); + PROPERTIES_MAP.put(RpcConstants.CLIENT_SOCKET_TIMEOUT, ConfVars.RPC_CLIENT_SOCKET_TIMEOUT); + PROPERTIES_MAP.put(RpcConstants.CLIENT_HANG_DETECTION, ConfVars.RPC_CLIENT_HANG_DETECTION_ENABLED); + } + + public static Properties get(TajoConf conf) { + final Properties properties = new Properties(); + + for (Map.Entry<String, ConfVars> e : PROPERTIES_MAP.entrySet()) { + properties.put(e.getKey(), conf.getVar(e.getValue())); + } + + return properties; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 94bf785..a3cc8fc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -253,7 +253,7 @@ public class ExecutionBlockContext { //If QueryMaster does not responding, current execution block should be stop CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); getStub().fatalError(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); } catch (Exception e) { getWorkerContext().getTaskManager().getDispatcher().getEventHandler() .handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e)); @@ -300,7 +300,7 @@ public class ExecutionBlockContext { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); return; } @@ -355,7 +355,7 @@ public class ExecutionBlockContext { try { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index bc4f9a1..a3b71e1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -35,12 +35,14 @@ import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeStatusEvent; import java.net.ConnectException; import java.util.Collection; import java.util.List; +import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -55,7 +57,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class); - private TajoConf tajoConf; + private TajoConf systemConf; private StatusUpdaterThread updaterThread; private volatile boolean isStopped; private int heartBeatInterval; @@ -63,6 +65,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue; private final TajoWorker.WorkerContext workerContext; private AsyncRpcClient rmClient; + private Properties rpcParams; private ServiceTracker serviceTracker; private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface resourceTracker; private int queueingThreshold; @@ -75,11 +78,12 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N @Override public void serviceInit(Configuration conf) throws Exception { - this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + this.rpcParams = RpcParameterFactory.get(this.systemConf); this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); - this.serviceTracker = ServiceTrackerFactory.get(tajoConf); + this.serviceTracker = ServiceTrackerFactory.get(systemConf); this.workerContext.getNodeResourceManager().getDispatcher().register(NodeStatusEvent.EventType.class, this); - this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL); + this.heartBeatInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL); this.updaterThread = new StatusUpdaterThread(); this.updaterThread.setName("NodeStatusUpdater"); super.serviceInit(conf); @@ -89,10 +93,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N public void serviceStart() throws Exception { DefaultResourceCalculator calculator = new DefaultResourceCalculator(); int maxContainer = calculator.computeAvailableContainers(workerContext.getNodeResourceManager().getTotalResource(), - NodeResources.createResource(tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1)); + NodeResources.createResource(systemConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1)); // if resource changed over than 30%, send reports - float queueingRate = tajoConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE); + float queueingRate = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_HEARTBEAT_QUEUE_THRESHOLD_RATE); this.queueingThreshold = Math.max((int) Math.floor(maxContainer * queueingRate), 1); LOG.info("Queueing threshold:" + queueingThreshold); @@ -149,8 +153,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N RpcClientManager rpcManager = RpcClientManager.getInstance(); rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(), - TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(), - rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false); + TajoResourceTrackerProtocol.class, true, rpcParams); return rmClient.getStub(); } @@ -165,7 +168,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<NodeHeartbeatResponse>(); resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); - response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn(e.getMessage()); } catch (TimeoutException te) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index fbb8d54..607e7ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -152,11 +152,6 @@ public class TajoWorker extends CompositeService { this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); RackResolver.init(systemConf); - RpcClientManager rpcManager = RpcClientManager.getInstance(); - rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); - rpcManager.setTimeoutSeconds( - systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS)); - serviceTracker = ServiceTrackerFactory.get(systemConf); this.workerContext = new TajoWorkerContext(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index 42db852..a0b3f97 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -35,6 +35,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.*; @@ -42,11 +43,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; -import static org.apache.tajo.ResourceProtos.ExecutionBlockListProto; -import static org.apache.tajo.ResourceProtos.ExecutionBlockContextRequest; -import static org.apache.tajo.ResourceProtos.ExecutionBlockContextResponse; +import static org.apache.tajo.ResourceProtos.*; /** * A TaskManager is responsible for managing executionBlock resource and tasks. @@ -58,6 +58,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan private final Map<ExecutionBlockId, ExecutionBlockContext> executionBlockContextMap; private final Dispatcher dispatcher; private TaskExecutor executor; + private final Properties rpcParams; public TaskManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext){ this(dispatcher, workerContext, null); @@ -70,6 +71,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan this.workerContext = workerContext; this.executionBlockContextMap = Maps.newHashMap(); this.executor = executor; + this.rpcParams = RpcParameterFactory.get(this.workerContext.getConf()); } @Override @@ -118,13 +120,13 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan request.setExecutionBlockId(executionBlockId.getProto()) .setWorker(getWorkerContext().getConnectionInfo().getProto()); - client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true); + client = RpcClientManager.getInstance().newClient(address, QueryMasterProtocol.class, true, rpcParams); QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<ExecutionBlockContextResponse>(); stub.getExecutionBlockContext(callback.getController(), request.build(), callback); ExecutionBlockContextResponse contextProto = - callback.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + callback.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client); context.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index f94bd78..ee428cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -26,6 +26,7 @@ import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.worker.TajoWorker; /** @@ -40,7 +41,8 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { RpcClientManager manager = RpcClientManager.getInstance(); ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); - NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); + NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true, RpcParameterFactory.get(tajoConf)); masterClient.getStub(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-docs/src/main/sphinx/jdbc_driver.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/jdbc_driver.rst b/tajo-docs/src/main/sphinx/jdbc_driver.rst index 6c7371b..176477a 100644 --- a/tajo-docs/src/main/sphinx/jdbc_driver.rst +++ b/tajo-docs/src/main/sphinx/jdbc_driver.rst @@ -42,6 +42,49 @@ In order to use the JDBC driver, you should add ``tajo-jdbc-x.y.z.jar`` in your CLASSPATH=path/to/tajo-jdbc-x.y.z.jar:$CLASSPATH +Connecting to the Tajo cluster instance +======================================= +A Tajo cluster is represented by a URL. Tajo JDBC driver can take the following URL forms: + + * ``jdbc:tajo://host/`` + * ``jdbc:tajo://host/database`` + * ``jdbc:tajo://host:port/`` + * ``jdbc:tajo://host:port/database`` + +Each part of URL has the following meanings: + + * ``host`` - The hostname of the TajoMaster. You can put hostname or ip address here. + * ``port`` - The port number that server is listening. Default port number is 26002. + * ``database`` - The database name. The default database name is ``default``. + + To connect, you need to get ``Connection`` instance from Java JDBC Driver Manager as follows: + +.. code-block:: java + + Connection db = DriverManager.getConnection(url); + + +Connection Parameters +===================== +Connection parameters lets the JDBC Copnnection to enable or disable additional features. You should use ``java.util.Properties`` to pass your connection parameters into ``Connection``. The following example means that the transmission of ResultSet uses compression and its connection timeout is 15 seconds. + +.. code-block:: java + + String url = "jdbc:tajo://localhost/test"; + Properties props = new Properties(); + props.setProperty("useCompression","true"); // use compression for ResultSet + props.setProperty("connectTimeout","15000"); // 15 seconds + Connection conn = DriverManager.getConnection(url, props); + +The connection parameters that Tajo currently supports are as follows: + + * ``useCompression = bool`` - Enable compressed transfer for ResultSet. + * ``defaultRowFetchSize = int`` - Determine the number of rows fetched in ResultSet by one fetch with trip to the Server. + * ``connectTimeout = int (seconds)`` - The timeout value used for socket connect operations. If connecting to the server takes longer than this value, the connection is broken. The timeout is specified in seconds and a value of zero means that it is disabled. + * ``socketTimeout = int (seconds)`` - The timeout value used for socket read operations. If reading from the server takes longer than this value, the connection is closed. This can be used as both a brute force global query timeout and a method of detecting network problems. The timeout is specified in seconds and a value of zero means that it is disabled. + * ``retry = int`` - Number of retry operation. Tajo JDBC driver is resilient against some network or connection problems. It determines how many times the connection will retry. + + An Example JDBC Client ======================= http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-docs/src/main/sphinx/table_management/tablespaces.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/table_management/tablespaces.rst b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst index 964491c..79ea65f 100644 --- a/tajo-docs/src/main/sphinx/table_management/tablespaces.rst +++ b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst @@ -42,4 +42,4 @@ The following is an example for two tablespaces for hbase and hdfs: .. note:: - Also, each tablespace can use different storage type. Please see :doc:`/storage_plugin` if you want to know more information about it. \ No newline at end of file + Also, each tablespace can use different storage type. Please see :doc:`/storage_plugins` if you want to know more information about it. \ No newline at end of file
