Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42bcf2de Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42bcf2de Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42bcf2de Branch: refs/heads/index_support Commit: 42bcf2de090bf1bb5b5ec711427654056a2866e2 Parents: 86c97b2 9b3824b Author: Jihoon Son <[email protected]> Authored: Fri May 8 01:35:19 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri May 8 01:35:19 2015 +0900 ---------------------------------------------------------------------- CHANGES | 10 + .../tajo/catalog/AbstractCatalogClient.java | 638 +++++++------------ .../org/apache/tajo/catalog/CatalogClient.java | 49 +- .../java/org/apache/tajo/catalog/Schema.java | 12 +- .../org/apache/tajo/catalog/CatalogServer.java | 8 +- .../tajo/catalog/LocalCatalogWrapper.java | 20 +- .../tajo/client/CatalogAdminClientImpl.java | 518 ++++++++------- .../org/apache/tajo/client/QueryClientImpl.java | 469 ++++++++------ .../apache/tajo/client/SessionConnection.java | 351 +++++----- .../java/org/apache/tajo/conf/TajoConf.java | 2 - .../org/apache/tajo/master/QueryInProgress.java | 31 +- .../querymaster/QueryMasterManagerService.java | 135 ++-- .../tajo/worker/ExecutionBlockContext.java | 32 +- .../java/org/apache/tajo/worker/TajoWorker.java | 1 + .../tajo/worker/TajoWorkerManagerService.java | 2 + .../main/java/org/apache/tajo/worker/Task.java | 4 +- .../java/org/apache/tajo/worker/TaskRunner.java | 43 +- .../src/main/proto/QueryMasterProtocol.proto | 14 +- .../cli/tsql/TestDefaultCliOutputFormatter.java | 4 - .../tajo/engine/planner/TestLogicalPlanner.java | 23 +- .../tajo/engine/query/TestInsertQuery.java | 19 + .../apache/tajo/querymaster/TestKillQuery.java | 63 +- .../TestInsertQuery/nation_diff_col_order.ddl | 1 + .../testInsertWithDifferentColumnOrder.sql | 1 + .../testInsertWithDifferentColumnOrder.result | 27 + .../org/apache/tajo/plan/LogicalPlanner.java | 9 +- .../org/apache/tajo/rpc/NettyClientBase.java | 10 +- .../tajo/rpc/RetriesExhaustedException.java | 104 --- .../org/apache/tajo/rpc/RpcClientManager.java | 9 + .../org/apache/tajo/rpc/ServerCallable.java | 148 ----- .../org/apache/tajo/rpc/TestBlockingRpc.java | 39 -- .../org/apache/tajo/storage/rcfile/RCFile.java | 14 +- .../tajo/storage/text/CSVLineDeserializer.java | 18 +- .../org/apache/tajo/storage/TestStorages.java | 59 ++ 34 files changed, 1365 insertions(+), 1522 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index b52b2f5,766f6c2..c872f8b --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@@ -29,17 -29,12 +29,13 @@@ import org.apache.tajo.catalog.proto.Ca import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; - import org.apache.tajo.rpc.NettyClientBase; - import org.apache.tajo.rpc.RpcClientManager; - import org.apache.tajo.rpc.ServerCallable; + import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; - import org.apache.tajo.service.ServiceTracker; - import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.TUtil; - import java.net.InetSocketAddress; + import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@@ -373,37 -269,14 +270,26 @@@ public abstract class AbstractCatalogCl } @Override + public List<IndexDescProto> getAllIndexes() { + try { - return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - - @Override - public List<IndexDescProto> call(NettyClientBase client) throws Exception { - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); - return response.getIndexList(); - } - }.withRetries(); ++ CatalogProtocolService.BlockingInterface stub = getStub(); ++ GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); ++ return response.getIndexList(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) { try { - return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public PartitionMethodDesc call(NettyClientBase client) throws ServiceException { - - TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); - builder.setDatabaseName(databaseName); - builder.setTableName(tableName); + TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); + return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build())); } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@@ -637,42 -458,15 +471,42 @@@ } @Override - public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) { + public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) { + return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns)); + } + + @Override + public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) { try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); - builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - for (String colunName : columnNames) { - builder.addColumnNames(colunName); - } - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); ++//<<<<<<< HEAD ++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); + builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); ++ for (String colunName : columnNames) { ++ builder.addColumnNames(colunName); ++ } - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existIndexByColumnNames(null, builder.build()).getValue(); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); - return stub.existIndexByColumn(null, builder.build()).getValue(); ++ return stub.existIndexByColumnNames(null, builder.build()).getValue(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return false; + } + } + + @Override + public boolean existIndexesByTable(final String databaseName, final String tableName) { + try { - return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public Boolean call(NettyClientBase client) throws ServiceException { - - CatalogProtocolService.BlockingInterface stub = getStub(client); - return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue(); - } - }.withRetries(); ++ CatalogProtocolService.BlockingInterface stub = getStub(); ++ return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue(); ++//======= ++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); ++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); ++// builder.setColumnName(columnName); ++// ++// CatalogProtocolService.BlockingInterface stub = getStub(); ++// return stub.existIndexByColumn(null, builder.build()).getValue(); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@@ -699,62 -488,17 +528,61 @@@ } } + private static String[] extractColumnNames(Column[] columns) { + String[] columnNames = new String [columns.length]; + for (int i = 0; i < columnNames.length; i++) { + columnNames[i] = columns[i].getSimpleName(); + } + return columnNames; + } + + @Override + public final IndexDesc getIndexByColumns(final String databaseName, + final String tableName, + final Column [] columns) { + return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns)); + } + @Override - public final IndexDesc getIndexByColumn(final String databaseName, - final String tableName, - final String columnName) { + public final IndexDesc getIndexByColumnNames(final String databaseName, + final String tableName, + final String [] columnNames) { try { - return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - public IndexDesc call(NettyClientBase client) throws ServiceException { - - GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); - builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - for (String columnName : columnNames) { - builder.addColumnNames(columnName); - } - GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); ++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); ++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); ++// builder.setColumnName(columnName); ++// ++//<<<<<<< HEAD ++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); + builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); - builder.setColumnName(columnName); ++ for (String columnName : columnNames) { ++ builder.addColumnNames(columnName); ++ } - CatalogProtocolService.BlockingInterface stub = getStub(client); - return new IndexDesc(stub.getIndexByColumnNames(null, builder.build())); - } - }.withRetries(); + CatalogProtocolService.BlockingInterface stub = getStub(); - return new IndexDesc(stub.getIndexByColumn(null, builder.build())); ++ return new IndexDesc(stub.getIndexByColumnNames(null, builder.build())); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName, + final String tableName) { + try { - return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) { - @Override - public Collection<IndexDesc> call(NettyClientBase client) throws Exception { - TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName); - CatalogProtocolService.BlockingInterface stub = getStub(client); - GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto); - List<IndexDesc> indexDescs = TUtil.newList(); - for (IndexDescProto descProto : response.getIndexDescList()) { - indexDescs.add(new IndexDesc(descProto)); - } - return indexDescs; - } - }.withRetries(); ++ TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName); ++ CatalogProtocolService.BlockingInterface stub = getStub(); ++ GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto); ++ List<IndexDesc> indexDescs = TUtil.newList(); ++ for (IndexDescProto descProto : response.getIndexDescList()) { ++ indexDescs.add(new IndexDesc(descProto)); ++ } ++ return indexDescs; ++//======= ++// CatalogProtocolService.BlockingInterface stub = getStub(); ++// return new IndexDesc(stub.getIndexByColumn(null, builder.build())); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@@ -781,6 -520,18 +604,21 @@@ return false; } } - - @Override - public List<IndexProto> getAllIndexes() { - try { - CatalogProtocolService.BlockingInterface stub = getStub(); - GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); - return response.getIndexList(); - } catch (ServiceException e) { - LOG.error(e.getMessage(), e); - return new ArrayList<IndexProto>(); - } - } ++//<<<<<<< HEAD ++//======= ++// ++// @Override ++// public List<IndexProto> getAllIndexes() { ++// try { ++// CatalogProtocolService.BlockingInterface stub = getStub(); ++// GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); ++// return response.getIndexList(); ++// } catch (ServiceException e) { ++// LOG.error(e.getMessage(), e); ++// return new ArrayList<IndexProto>(); ++// } ++// } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 @Override public final boolean createFunction(final FunctionDesc funcDesc) { http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 5fa1c67,9397fcf..5a04892 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@@ -20,16 -20,15 +20,15 @@@ package org.apache.tajo.client import com.google.protobuf.ServiceException; import org.apache.tajo.annotation.Nullable; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.*; +import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.SQLStates; import org.apache.tajo.rpc.NettyClientBase; - import org.apache.tajo.rpc.ServerCallable; import java.io.IOException; import java.net.URI; @@@ -132,32 -97,25 +97,57 @@@ public class CatalogAdminClientImpl imp final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { - return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setName(tableName); - builder.setSchema(schema.getProto()); - builder.setMeta(meta.getProto()); - builder.setPath(path.toString()); - if (partitionMethodDesc != null) { - builder.setPartition(partitionMethodDesc.getProto()); - } - ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } - - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setName(tableName); + builder.setSchema(schema.getProto()); + builder.setMeta(meta.getProto()); + builder.setPath(path.toString()); + if (partitionMethodDesc != null) { + builder.setPartition(partitionMethodDesc.getProto()); + } + ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { - throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); ++ throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); + } ++ ++//<<<<<<< HEAD ++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// ++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { ++// ++// ++// } ++// ++// }.withRetries(); ++//======= ++// NettyClientBase client = connection.getTajoMasterConnection(); ++// connection.checkSessionAndGet(client); ++// BlockingInterface tajoMasterService = client.getStub(); ++// ++// ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setName(tableName); ++// builder.setSchema(schema.getProto()); ++// builder.setMeta(meta.getProto()); ++// builder.setPath(path.toString()); ++// if (partitionMethodDesc != null) { ++// builder.setPartition(partitionMethodDesc.getProto()); ++// } ++// ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); ++// if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++// return CatalogUtil.newTableDesc(res.getTableDesc()); ++// } else { ++// throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); ++// } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } @Override @@@ -211,169 -156,37 +188,222 @@@ @Override public TableDesc getTableDesc(final String tableName) throws ServiceException { - - return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setTableName(tableName); - ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { - return CatalogUtil.newTableDesc(res.getTableDesc()); - } else { - throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()); - } - } -- - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setTableName(tableName); + ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { + return CatalogUtil.newTableDesc(res.getTableDesc()); + } else { - throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState())); ++ throw new ServiceException(new SQLException(res.getResult().getErrorMessage(), ++ SQLStates.ER_NO_SUCH_TABLE.getState())); + } ++ ++//<<<<<<< HEAD ++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// ++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { ++// ++// ++// } ++// ++// }.withRetries(); ++//======= ++// NettyClientBase client = connection.getTajoMasterConnection(); ++// connection.checkSessionAndGet(client); ++// BlockingInterface tajoMasterService = client.getStub(); ++// ++// ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setTableName(tableName); ++// ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build()); ++// if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++// return CatalogUtil.newTableDesc(res.getTableDesc()); ++// } else { ++// throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState())); ++// } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } @Override public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { - - return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { - - connection.checkSessionAndGet(client); - BlockingInterface tajoMasterService = client.getStub(); - - String paramFunctionName = functionName == null ? "" : functionName; - ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, - connection.convertSessionedString(paramFunctionName)); - if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { - return res.getFunctionsList(); - } else { - throw new SQLException(res.getResult().getErrorMessage()); - } - } -- - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + BlockingInterface tajoMasterService = client.getStub(); + + String paramFunctionName = functionName == null ? "" : functionName; + ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, + connection.convertSessionedString(paramFunctionName)); - if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) { + return res.getFunctionsList(); + } else { - throw new ServiceException(new SQLException(res.getErrorMessage())); ++ throw new ServiceException(res.getResult().getErrorMessage()); ++ } ++ ++//<<<<<<< HEAD ++// return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException { ++// ++// ++// } ++// ++// }.withRetries(); ++//======= ++// NettyClientBase client = connection.getTajoMasterConnection(); ++// connection.checkSessionAndGet(client); ++// BlockingInterface tajoMasterService = client.getStub(); ++// ++// String paramFunctionName = functionName == null ? "" : functionName; ++// ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null, ++// connection.convertSessionedString(paramFunctionName)); ++// if (res.getResultCode() == ClientProtos.ResultCode.OK) { ++// return res.getFunctionsList(); ++// } else { ++// throw new ServiceException(new SQLException(res.getErrorMessage())); ++// } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 + } + + @Override + public IndexDescProto getIndex(final String indexName) throws ServiceException { - return new ServerCallable<IndexDescProto>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public IndexDescProto call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.getIndexWithName(null, - connection.convertSessionedString(indexName)); - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ return tajoMasterService.getIndexWithName(null, ++ connection.convertSessionedString(indexName)); + } + + @Override + public boolean existIndex(final String indexName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public Boolean call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existIndexWithName(null, - connection.convertSessionedString(indexName)).getValue(); - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ return tajoMasterService.existIndexWithName(null, ++ connection.convertSessionedString(indexName)).getValue(); ++// return new ServerCallable<Boolean>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public Boolean call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); + } + + @Override + public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException { - return new ServerCallable<List<IndexDescProto>>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public List<IndexDescProto> call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - GetIndexesResponse response = tajoMasterService.getIndexesForTable(null, - connection.convertSessionedString(tableName)); - if (response.getResult().getResultCode() == ResultCode.OK) { - return response.getIndexesList(); - } else { - throw new SQLException(response.getResult().getErrorMessage()); - } - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ GetIndexesResponse response = tajoMasterService.getIndexesForTable(null, ++ connection.convertSessionedString(tableName)); ++ if (response.getResult().getResultCode() == ResultCode.OK) { ++ return response.getIndexesList(); ++ } else { ++ throw new ServiceException(response.getResult().getErrorMessage()); ++ } ++// return new ServerCallable<List<IndexDescProto>>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public List<IndexDescProto> call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); + } + + @Override + public boolean hasIndexes(final String tableName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public Boolean call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.existIndexesForTable(null, - connection.convertSessionedString(tableName)).getValue(); - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ return tajoMasterService.existIndexesForTable(null, ++ connection.convertSessionedString(tableName)).getValue(); ++ ++// return new ServerCallable<Boolean>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public Boolean call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); + } + + @Override + public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException { - return new ServerCallable<IndexDescProto>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public IndexDescProto call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setTableName(tableName); - for (String eachColumnName : columnNames) { - builder.addColumnNames(eachColumnName); - } - GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build()); - if (response.getResult().getResultCode() == ResultCode.OK) { - return response.getIndexDesc(); - } else { - throw new SQLException(response.getResult().getErrorMessage()); - } - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); ++ builder.setSessionId(connection.sessionId); ++ builder.setTableName(tableName); ++ for (String eachColumnName : columnNames) { ++ builder.addColumnNames(eachColumnName); ++ } ++ GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build()); ++ if (response.getResult().getResultCode() == ResultCode.OK) { ++ return response.getIndexDesc(); ++ } else { ++ throw new ServiceException(response.getResult().getErrorMessage()); + } ++ ++// return new ServerCallable<IndexDescProto>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public IndexDescProto call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); + } + + @Override + public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public Boolean call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setTableName(tableName); - for (String eachColumnName : columnName) { - builder.addColumnNames(eachColumnName); - } - return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue(); - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder(); ++ builder.setSessionId(connection.sessionId); ++ builder.setTableName(tableName); ++ for (String eachColumnName : columnName) { ++ builder.addColumnNames(eachColumnName); ++ } ++ return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue(); ++ ++// return new ServerCallable<Boolean>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public Boolean call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); + } + + @Override + public boolean dropIndex(final String indexName) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, - connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - @Override - public Boolean call(NettyClientBase client) throws Exception { - BlockingInterface tajoMasterService = client.getStub(); - return tajoMasterService.dropIndex(null, - connection.convertSessionedString(indexName)).getValue(); - } - }.withRetries(); ++ NettyClientBase client = connection.getTajoMasterConnection(); ++ connection.checkSessionAndGet(client); ++ BlockingInterface tajoMasterService = client.getStub(); ++ return tajoMasterService.dropIndex(null, ++ connection.convertSessionedString(indexName)).getValue(); ++ ++// return new ServerCallable<Boolean>(connection.manager, ++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { ++// ++// @Override ++// public Boolean call(NettyClientBase client) throws Exception { ++// ++// } ++// }.withRetries(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --cc tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 73abc4c,53889fe..007c010 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@@ -153,28 -153,21 +153,40 @@@ public class QueryClientImpl implement @Override public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException { - - return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - - final QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - - SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); - if (response.getResult().getResultCode() == ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - } - return response; - } - }.withRetries(); + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + + final QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - + SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); - if (response.getResultCode() == ResultCode.OK) { ++ if (response.getResult().getResultCode() == ResultCode.OK) { + connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + } + return response; ++ ++//<<<<<<< HEAD ++// connection.checkSessionAndGet(client); ++// ++// final QueryRequest.Builder builder = QueryRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setQuery(sql); ++// builder.setIsJson(false); ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// ++// ++// ++// } ++// }.withRetries(); ++//======= ++// SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); ++// if (response.getResultCode() == ResultCode.OK) { ++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); ++// } ++// return response; ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } @Override @@@ -369,42 -356,26 +375,60 @@@ throws ServiceException { try { - final ServerCallable<ClientProtos.SerializedResultSet> callable = - new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - builder.setFetchRowNum(fetchRowNum); - try { - GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { - abort(); - throw new ServiceException(response.getResult().getErrorMessage()); - } - - return response.getResultSet(); - } catch (ServiceException e) { - abort(); - throw e; - } catch (Throwable t) { - throw new ServiceException(t.getMessage(), t); - } - } - }; - - ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); ++//<<<<<<< HEAD ++// final ServerCallable<ClientProtos.SerializedResultSet> callable = ++// new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// ++// public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { ++// ++// connection.checkSessionAndGet(client); ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// ++// GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setQueryId(queryId.getProto()); ++// builder.setFetchRowNum(fetchRowNum); ++// try { ++// GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); ++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { ++// abort(); ++// throw new ServiceException(response.getResult().getErrorMessage()); ++// } ++// ++// return response.getResultSet(); ++// } catch (ServiceException e) { ++// abort(); ++// throw e; ++// } catch (Throwable t) { ++// throw new ServiceException(t.getMessage(), t); ++// } ++// } ++// }; ++// ++// ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); ++//======= + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + builder.setFetchRowNum(fetchRowNum); + + GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { - throw new ServiceException(response.getErrorMessage()); ++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { ++ throw new ServiceException(response.getResult().getErrorMessage()); + } + + ClientProtos.SerializedResultSet resultSet = response.getResultSet(); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 return new TajoMemoryResultSet(queryId, - new Schema(serializedResultSet.getSchema()), - serializedResultSet.getSerializedTuplesList(), - serializedResultSet.getSerializedTuplesCount(), + new Schema(resultSet.getSchema()), + resultSet.getSerializedTuplesList(), + resultSet.getSerializedTuplesCount(), getClientSideSessionVars()); } catch (ServiceException e) { throw e; @@@ -416,59 -387,47 +440,92 @@@ @Override public boolean updateQuery(final String sql) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(sql); - builder.setIsJson(false); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - - if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { - connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return true; - } else { - if (response.getResult().hasErrorMessage()) { - System.err.println("ERROR: " + response.getResult().getErrorMessage()); - } - return false; - } + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(sql); + builder.setIsJson(false); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); + - if (response.getResultCode() == ClientProtos.ResultCode.OK) { ++//<<<<<<< HEAD ++// connection.checkSessionAndGet(client); ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// ++// QueryRequest.Builder builder = QueryRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setQuery(sql); ++// builder.setIsJson(false); ++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); ++// ++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { ++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); ++// return true; ++// } else { ++// if (response.getResult().hasErrorMessage()) { ++// System.err.println("ERROR: " + response.getResult().getErrorMessage()); ++// } ++// return false; ++// } ++//======= ++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { + connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return true; + } else { - if (response.hasErrorMessage()) { - LOG.error("ERROR: " + response.getErrorMessage()); ++ if (response.getResult().hasErrorMessage()) { ++ LOG.error("ERROR: " + response.getResult().getErrorMessage()); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } - }.withRetries(); + return false; + } } @Override public boolean updateQueryWithJson(final String json) throws ServiceException { - return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - - connection.checkSessionAndGet(client); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - - QueryRequest.Builder builder = QueryRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQuery(json); - builder.setIsJson(true); - ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { - return true; - } else { - if (response.getResult().hasErrorMessage()) { - System.err.println("ERROR: " + response.getResult().getErrorMessage()); - } - return false; - } ++//<<<<<<< HEAD ++// return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// ++// public Boolean call(NettyClientBase client) throws ServiceException { ++// ++// connection.checkSessionAndGet(client); ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// ++// QueryRequest.Builder builder = QueryRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setQuery(json); ++// builder.setIsJson(true); ++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); ++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { ++// return true; ++// } else { ++// if (response.getResult().hasErrorMessage()) { ++// System.err.println("ERROR: " + response.getResult().getErrorMessage()); ++// } ++// return false; ++// } ++//======= + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + + QueryRequest.Builder builder = QueryRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQuery(json); + builder.setIsJson(true); + ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); - if (response.getResultCode() == ClientProtos.ResultCode.OK) { ++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { + return true; + } else { - if (response.hasErrorMessage()) { - LOG.error("ERROR: " + response.getErrorMessage()); ++ if (response.getResult().hasErrorMessage()) { ++ LOG.error("ERROR: " + response.getResult().getErrorMessage()); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } - }.withRetries(); + return false; + } } @Override @@@ -581,25 -519,20 +617,42 @@@ } public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { - return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - public QueryInfoProto call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); - - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); - if (res.getResult().getResultCode() == ResultCode.OK) { - return res.getQueryInfo(); - } else { - abort(); - throw new ServiceException(res.getResult().getErrorMessage()); - } - } - }.withRetries(); ++//<<<<<<< HEAD ++// return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// public QueryInfoProto call(NettyClientBase client) throws ServiceException { ++// connection.checkSessionAndGet(client); ++// ++// QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); ++// builder.setSessionId(connection.sessionId); ++// builder.setQueryId(queryId.getProto()); ++// ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); ++// if (res.getResult().getResultCode() == ResultCode.OK) { ++// return res.getQueryInfo(); ++// } else { ++// abort(); ++// throw new ServiceException(res.getResult().getErrorMessage()); ++// } ++// } ++// }.withRetries(); ++//======= + NettyClientBase client = connection.getTajoMasterConnection(); + connection.checkSessionAndGet(client); + + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); - if (res.getResultCode() == ResultCode.OK) { ++ if (res.getResult().getResultCode() == ResultCode.OK) { + return res.getQueryInfo(); + } else { - throw new ServiceException(res.getErrorMessage()); ++ throw new ServiceException(res.getResult().getErrorMessage()); + } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { @@@ -611,24 -544,31 +664,42 @@@ InetSocketAddress qmAddress = new InetSocketAddress( queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort()); - return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress, - QueryMasterClientProtocol.class, false) { - public QueryHistoryProto call(NettyClientBase client) throws ServiceException { - connection.checkSessionAndGet(client); + RpcClientManager manager = RpcClientManager.getInstance(); + NettyClientBase queryMasterClient; + try { + queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false, + manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false); + } catch (Exception e) { + throw new ServiceException(e); + } - QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); - builder.setSessionId(connection.sessionId); - builder.setQueryId(queryId.getProto()); + try { + connection.checkSessionAndGet(connection.getTajoMasterConnection()); + ++//<<<<<<< HEAD ++// QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); ++// GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); ++// if (res.getResult().getResultCode() == ResultCode.OK) { ++// return res.getQueryHistory(); ++// } else { ++// abort(); ++// throw new ServiceException(res.getResult().getErrorMessage()); ++// } ++//======= + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + builder.setSessionId(connection.sessionId); + builder.setQueryId(queryId.getProto()); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); - GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); - if (res.getResult().getResultCode() == ResultCode.OK) { - return res.getQueryHistory(); - } else { - abort(); - throw new ServiceException(res.getResult().getErrorMessage()); - } + QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub(); + GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build()); - if (res.getResultCode() == ResultCode.OK) { ++ if (res.getResult().getResultCode() == ResultCode.OK) { + return res.getQueryHistory(); + } else { - throw new ServiceException(res.getErrorMessage()); ++ throw new ServiceException(res.getResult().getErrorMessage()); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } - }.withRetries(); + } finally { + queryMasterClient.close(); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --cc tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index bb15981,84decd5..1bb0e16 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@@ -157,52 -160,43 +160,81 @@@ public class SessionConnection implemen } public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), - TajoMasterClientProtocol.class, false) { - - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - KeyValueSet keyValueSet = new KeyValueSet(); - keyValueSet.putAll(variables); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .setSessionVars(keyValueSet.getProto()).build(); - - SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - - if (response.getResult().getResultCode() == ResultCode.OK) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw new ServiceException(response.getResult().getErrorMessage()); - } - } - }.withRetries(); ++//<<<<<<< HEAD ++// return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), ++// TajoMasterClientProtocol.class, false) { ++// ++// public Map<String, String> call(NettyClientBase client) throws ServiceException { ++// checkSessionAndGet(client); ++// ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// KeyValueSet keyValueSet = new KeyValueSet(); ++// keyValueSet.putAll(variables); ++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() ++// .setSessionId(sessionId) ++// .setSessionVars(keyValueSet.getProto()).build(); ++// ++// SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); ++// ++// if (response.getResult().getResultCode() == ResultCode.OK) { ++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); ++// return Collections.unmodifiableMap(sessionVarsCache); ++// } else { ++// throw new ServiceException(response.getResult().getErrorMessage()); ++// } ++// } ++// }.withRetries(); ++// } ++//======= + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 + + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(variables); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); + + SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); + - if (response.getResultCode() == ResultCode.OK) { ++ if (response.getResult().getResultCode() == ResultCode.OK) { + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); + } else { - throw new ServiceException(response.getMessage()); ++ throw new ServiceException(response.getResult().getErrorMessage()); + } } - public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException { - return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public Map<String, String> call(NettyClientBase client) throws ServiceException { - checkSessionAndGet(client); - - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .addAllUnsetVariables(variables).build(); - - SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); - - if (response.getResult().getResultCode() == ResultCode.OK) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw new ServiceException(response.getResult().getErrorMessage()); - } - } - }.withRetries(); + public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException { + NettyClientBase client = getTajoMasterConnection(); + checkSessionAndGet(client); + ++//<<<<<<< HEAD ++// if (response.getResult().getResultCode() == ResultCode.OK) { ++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); ++// return Collections.unmodifiableMap(sessionVarsCache); ++// } else { ++// throw new ServiceException(response.getResult().getErrorMessage()); ++// } ++// } ++// }.withRetries(); ++//======= + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .addAllUnsetVariables(variables).build(); + + SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); + - if (response.getResultCode() == ResultCode.OK) { ++ if (response.getResult().getResultCode() == ResultCode.OK) { + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); + } else { - throw new ServiceException(response.getMessage()); ++ throw new ServiceException(response.getResult().getErrorMessage()); + } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } void updateSessionVarsCache(Map<String, String> variables) { @@@ -333,55 -308,51 +346,81 @@@ } public boolean reconnect() throws Exception { - return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) { - - public Boolean call(NettyClientBase client) throws ServiceException { - CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); - builder.setUsername(userInfo.getUserName()).build(); - if (baseDatabase != null) { - builder.setBaseDatabaseName(baseDatabase); - } - + CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder(); + builder.setUsername(userInfo.getUserName()).build(); + if (baseDatabase != null) { + builder.setBaseDatabaseName(baseDatabase); + } - // create new session - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); - CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); - if (response.getResult().getResultCode() != ResultCode.OK) { - return false; - } + NettyClientBase client = getTajoMasterConnection(); + ++//<<<<<<< HEAD ++// // create new session ++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ++// CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); ++// if (response.getResult().getResultCode() != ResultCode.OK) { ++// return false; ++// } ++//======= + // create new session + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); - if (response.getResultCode() != ResultCode.OK) { ++ if (response.getResult().getResultCode() != ResultCode.OK) { + return false; + } ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 - // Invalidate some session variables in client cache - sessionId = response.getSessionId(); - Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); - synchronized (sessionVarsCache) { - for (SessionVars var : UPDATE_ON_RECONNECT) { - String value = sessionVars.get(var.keyname()); - if (value != null) { - sessionVarsCache.put(var.keyname(), value); - } - } + // Invalidate some session variables in client cache + sessionId = response.getSessionId(); + Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars()); + synchronized (sessionVarsCache) { + for (SessionVars var : UPDATE_ON_RECONNECT) { + String value = sessionVars.get(var.keyname()); + if (value != null) { + sessionVarsCache.put(var.keyname(), value); } + } + } - // Update the session variables in server side - try { - KeyValueSet keyValueSet = new KeyValueSet(); - keyValueSet.putAll(sessionVarsCache); - ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() - .setSessionId(sessionId) - .setSessionVars(keyValueSet.getProto()).build(); - - if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) { - tajoMasterService.removeSession(null, sessionId); - return false; - } - LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); - return true; - } catch (ServiceException e) { - tajoMasterService.removeSession(null, sessionId); - return false; - } ++//<<<<<<< HEAD ++// // Update the session variables in server side ++// try { ++// KeyValueSet keyValueSet = new KeyValueSet(); ++// keyValueSet.putAll(sessionVarsCache); ++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() ++// .setSessionId(sessionId) ++// .setSessionVars(keyValueSet.getProto()).build(); ++// ++// if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) { ++// tajoMasterService.removeSession(null, sessionId); ++// return false; ++// } ++// LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); ++// return true; ++// } catch (ServiceException e) { ++// tajoMasterService.removeSession(null, sessionId); ++// return false; ++// } ++//======= + // Update the session variables in server side + try { + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.putAll(sessionVarsCache); + ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() + .setSessionId(sessionId) + .setSessionVars(keyValueSet.getProto()).build(); + - if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) { ++ if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) { + tajoMasterService.removeSession(null, sessionId); + return false; ++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } - }.withRetries(); + LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); + return true; + } catch (ServiceException e) { + tajoMasterService.removeSession(null, sessionId); + return false; + } } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index be1cdf2,b1a27fa..7ad658f --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@@ -157,24 -153,55 +153,55 @@@ public class TestKillQuery @Test public final void testIgnoreStageStateFromKilled() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); - QueryId queryId = new QueryId(res.getQueryId()); - cluster.waitForQuerySubmitted(queryId); + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + + LogicalPlanner planner = new LogicalPlanner(catalog); - LogicalOptimizer optimizer = new LogicalOptimizer(conf); ++ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog); + Expr expr = analyzer.parse(queryStr); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); - QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); - Query query = qmt.getQuery(); + CountDownLatch barrier = new CountDownLatch(1); + MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), dispatch); - // wait for a stage created - cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10); - query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); try{ - cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50); - } finally { - assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState()); + barrier.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState()); + } + + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); + + // fire kill event + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try { + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } finally { + queryMasterTask.stop(); } - List<Stage> stages = Lists.newArrayList(query.getStages()); + List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages()); Stage lastStage = stages.get(stages.size() - 1); assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ----------------------------------------------------------------------
