TAJO-1160: Remove Hadoop dependency from tajo-client module. (jinho) Closes #380
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/dbf91f54 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/dbf91f54 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/dbf91f54 Branch: refs/heads/master Commit: dbf91f54c0bd9ee5a8d1ec7f71fdace1af868604 Parents: 161ee9e Author: jhkim <[email protected]> Authored: Sat Feb 14 18:09:28 2015 +0900 Committer: jhkim <[email protected]> Committed: Sat Feb 14 18:09:28 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/cli/tools/TajoAdmin.java | 5 +- .../org/apache/tajo/cli/tools/TajoDump.java | 5 +- .../org/apache/tajo/cli/tools/TajoGetConf.java | 5 +- .../org/apache/tajo/cli/tools/TajoHAAdmin.java | 5 +- .../java/org/apache/tajo/cli/tsql/TajoCli.java | 11 +- tajo-client/pom.xml | 40 --- .../apache/tajo/client/CatalogAdminClient.java | 6 +- .../tajo/client/CatalogAdminClientImpl.java | 9 +- .../org/apache/tajo/client/QueryClientImpl.java | 25 +- .../apache/tajo/client/SessionConnection.java | 28 +- .../org/apache/tajo/client/TajoClientImpl.java | 57 ++-- .../org/apache/tajo/client/TajoClientUtil.java | 23 +- .../org/apache/tajo/jdbc/FetchResultSet.java | 5 +- .../apache/tajo/jdbc/TajoMemoryResultSet.java | 11 +- .../org/apache/tajo/jdbc/TajoResultSet.java | 212 -------------- .../org/apache/tajo/storage/RowStoreUtil.java | 290 +++++++++++++++++++ tajo-client/src/main/proto/ClientProtos.proto | 4 +- .../main/java/org/apache/tajo/datum/Datum.java | 9 +- .../exception/UnknownDataTypeException.java | 32 ++ .../org/apache/tajo/benchmark/BenchmarkSet.java | 6 +- .../java/org/apache/tajo/benchmark/TPCH.java | 2 +- .../org/apache/tajo/master/QueryManager.java | 20 ++ .../tajo/master/TajoMasterClientService.java | 40 ++- .../apache/tajo/master/exec/QueryExecutor.java | 9 +- .../tajo/webapp/QueryExecutorServlet.java | 19 +- .../main/java/org/apache/tajo/worker/Task.java | 11 +- .../main/resources/webapps/admin/querytasks.jsp | 7 +- .../resources/webapps/worker/querydetail.jsp | 12 +- .../apache/tajo/LocalTajoTestingUtility.java | 8 +- .../java/org/apache/tajo/QueryTestCaseBase.java | 20 +- .../org/apache/tajo/TajoTestingCluster.java | 20 +- .../org/apache/tajo/client/TestTajoClient.java | 37 +-- .../tajo/engine/query/TestJoinBroadcast.java | 12 +- .../tajo/engine/query/TestNullValues.java | 30 +- .../tajo/engine/query/TestSelectQuery.java | 7 +- .../tajo/engine/query/TestTablePartitions.java | 15 +- .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 9 +- .../org/apache/tajo/jdbc/TestResultSet.java | 17 +- .../master/scheduler/TestFifoScheduler.java | 5 +- .../apache/tajo/querymaster/TestKillQuery.java | 3 +- .../tajo/querymaster/TestQueryProgress.java | 12 +- .../org/apache/tajo/worker/TestHistory.java | 5 +- tajo-dist/pom.xml | 3 +- tajo-jdbc/pom.xml | 63 +--- .../org/apache/tajo/jdbc/JdbcConnection.java | 22 +- .../java/org/apache/tajo/jdbc/TajoDriver.java | 4 - .../org/apache/tajo/jdbc/TajoStatement.java | 5 +- .../org/apache/tajo/storage/RowStoreUtil.java | 2 +- .../exception/UnknownDataTypeException.java | 32 -- 50 files changed, 652 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9043a98..025bc14 100644 --- a/CHANGES +++ b/CHANGES @@ -29,6 +29,8 @@ Release 0.10.0 - unreleased IMPROVEMENT + TAJO-1160: Remove Hadoop dependency from tajo-client module. (jinho) + TAJO-1269: Separate cli from tajo-client. (hyunsik) TAJO-1328: Fix deprecated property names in the catalog configuration http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 18b2d99..5497435 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.TajoIdUtils; @@ -146,9 +147,9 @@ public class TajoAdmin { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } else if (hostName == null && port == null) { - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } switch (cmdType) { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java index 7f38a5d..497ccb6 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java @@ -30,6 +30,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; @@ -109,9 +110,9 @@ public class TajoDump { System.exit(-1); } else if (hostName != null && port != null) { conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port); - client = new TajoClientImpl(conf); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf)); } else { - client = new TajoClientImpl(conf); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf)); } PrintWriter writer = new PrintWriter(System.out); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java index aa7620b..756f2ee 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoGetConf.java @@ -23,6 +23,7 @@ import org.apache.commons.cli.*; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.service.ServiceTrackerFactory; import java.io.IOException; import java.io.PrintWriter; @@ -114,9 +115,9 @@ public class TajoGetConf { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } else if (hostName == null && port == null) { - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } processConfKey(writer, param); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java index 4f2d024..127ee8c 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java @@ -24,6 +24,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.service.ServiceTrackerFactory; import java.io.IOException; import java.io.PrintWriter; @@ -126,9 +127,9 @@ public class TajoHAAdmin { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } else if (hostName == null && port == null) { - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index f48a5b4..7d7d0bd 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -34,6 +34,7 @@ import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; import java.io.*; @@ -228,9 +229,9 @@ public class TajoCli { throw new RuntimeException("cannot find valid Tajo server address"); } else if (hostName != null && port != null) { conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port); - client = new TajoClientImpl(conf, baseDatabase); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase); } else if (hostName == null && port == null) { - client = new TajoClientImpl(conf, baseDatabase); + client = new TajoClientImpl(ServiceTrackerFactory.get(conf), baseDatabase); } try { @@ -563,7 +564,8 @@ public class TajoCli { if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { displayFormatter.printResult(sout, sin, desc, responseTime, res); } else { - res = TajoClientUtil.createResultSet(conf, client, response); + int fetchRows = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM); + res = TajoClientUtil.createResultSet(client, response, fetchRows); displayFormatter.printResult(sout, sin, desc, responseTime, res); } } catch (Throwable t) { @@ -624,7 +626,8 @@ public class TajoCli { float responseTime = ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0f); ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId); if (status.hasResult()) { - res = TajoClientUtil.createResultSet(conf, client, queryId, response); + int fetchRows = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM); + res = TajoClientUtil.createResultSet(client, queryId, response, fetchRows); TableDesc desc = new TableDesc(response.getTableDesc()); displayFormatter.printResult(sout, sin, desc, responseTime, res); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml index d266080..0014609 100644 --- a/tajo-client/pom.xml +++ b/tajo-client/pom.xml @@ -195,54 +195,14 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage-hdfs</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> <artifactId>tajo-rpc</artifactId> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <artifactId>protobuf-java</artifactId> - <groupId>com.google.protobuf</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-app</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-api</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java index a36fc0e..1512b24 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java @@ -19,7 +19,6 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; -import org.apache.hadoop.fs.Path; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; @@ -28,6 +27,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import java.io.Closeable; +import java.net.URI; import java.sql.SQLException; import java.util.List; @@ -79,7 +79,7 @@ public interface CatalogAdminClient extends Closeable { * @throws java.sql.SQLException * @throws ServiceException */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta) throws SQLException, ServiceException; /** @@ -95,7 +95,7 @@ public interface CatalogAdminClient extends Closeable { * @throws SQLException * @throws ServiceException */ - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException; http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java index 496161d..6347ad1 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java @@ -19,7 +19,6 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; -import org.apache.hadoop.fs.Path; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -34,10 +33,10 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.ServerCallable; import java.io.IOException; +import java.net.URI; import java.sql.SQLException; import java.util.List; -import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface; public class CatalogAdminClientImpl implements CatalogAdminClient { @@ -125,12 +124,12 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { } @Override - public TableDesc createExternalTable(String tableName, Schema schema, Path path, TableMeta meta) + public TableDesc createExternalTable(String tableName, Schema schema, URI path, TableMeta meta) throws SQLException, ServiceException { return createExternalTable(tableName, schema, path, meta, null); } - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { @@ -147,7 +146,7 @@ public class CatalogAdminClientImpl implements CatalogAdminClient { builder.setName(tableName); builder.setSchema(schema.getProto()); builder.setMeta(meta.getProto()); - builder.setPath(path.toUri().toString()); + builder.setPath(path.toString()); if (partitionMethodDesc != null) { builder.setPartition(partitionMethodDesc.getProto()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 813d5d8..bc89679 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -21,10 +21,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos; +import org.apache.tajo.*; import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -32,8 +29,8 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.QueryMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol; +import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; -import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.util.ProtoUtil; @@ -44,7 +41,6 @@ import java.sql.ResultSet; import java.util.List; import java.util.Map; -import static org.apache.tajo.conf.TajoConf.ConfVars; import static org.apache.tajo.ipc.ClientProtos.*; import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; @@ -52,9 +48,12 @@ import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProto public class QueryClientImpl implements QueryClient { private static final Log LOG = LogFactory.getLog(QueryClientImpl.class); private final SessionConnection connection; + private final int defaultFetchRows; public QueryClientImpl(SessionConnection connection) { this.connection = connection; + this.defaultFetchRows = this.connection.getProperties().getInt(SessionVars.FETCH_ROWNUM.getConfVars().keyname(), + SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal); } @Override @@ -94,7 +93,7 @@ public class QueryClientImpl implements QueryClient { @Override public void closeQuery(QueryId queryId) { - // nothing to do + closeNonForwardQuery(queryId); } @Override @@ -228,7 +227,7 @@ public class QueryClientImpl implements QueryClient { return this.createNullResultSet(queryId); } else { if (response.hasResultSet() || response.hasTableDesc()) { - return TajoClientUtil.createResultSet(connection.getConf() , this, response); + return TajoClientUtil.createResultSet(this, response, defaultFetchRows); } else { return this.createNullResultSet(queryId); } @@ -258,7 +257,7 @@ public class QueryClientImpl implements QueryClient { } else { if (response.hasResultSet() || response.hasTableDesc()) { - return TajoClientUtil.createResultSet(connection.getConf(), this, response); + return TajoClientUtil.createResultSet(this, response, defaultFetchRows); } else { return this.createNullResultSet(queryId); } @@ -333,14 +332,12 @@ public class QueryClientImpl implements QueryClient { GetQueryResultResponse response = getResultResponse(queryId); TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc()); - connection.getConf().setVar(ConfVars.USERNAME, response.getTajoUserName()); - - return new TajoResultSet(this, queryId, connection.getConf(), tableDesc); + return new FetchResultSet(this, tableDesc.getLogicalSchema(), queryId, defaultFetchRows); } @Override public ResultSet createNullResultSet(QueryId queryId) throws IOException { - return new TajoResultSet(this, queryId); + return TajoClientUtil.createNullResultSet(queryId); } @Override @@ -408,7 +405,7 @@ public class QueryClientImpl implements QueryClient { ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); - return new TajoMemoryResultSet( + return new TajoMemoryResultSet(queryId, new Schema(serializedResultSet.getSchema()), serializedResultSet.getSerializedTuplesList(), serializedResultSet.getSerializedTuplesCount(), http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 3e2b9cc..f8762da 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; @@ -34,7 +33,6 @@ import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; import org.jboss.netty.channel.ConnectTimeoutException; @@ -55,8 +53,6 @@ public class SessionConnection implements Closeable { private final Log LOG = LogFactory.getLog(TajoClientImpl.class); - private final TajoConf conf; - final RpcConnectionPool connPool; private final String baseDatabase; @@ -72,21 +68,25 @@ public class SessionConnection implements Closeable { private ServiceTracker serviceTracker; + private KeyValueSet properties; + /** * Connect to TajoMaster * - * @param conf TajoConf * @param tracker TajoMaster address * @param baseDatabase The base database name. It is case sensitive. If it is null, * the 'default' database will be used. + * @param properties configurations * @throws java.io.IOException */ - public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) - throws IOException { + public SessionConnection(ServiceTracker tracker, @Nullable String baseDatabase, + KeyValueSet properties) throws IOException { + + this.properties = properties; + + //TODO separate ConfVars from TajoConf + int workerNum = this.properties.getInt("tajo.rpc.client.worker-thread-num", 4); - this.conf = conf; - this.conf.set("tajo.disk.scheduler.report.interval", "0"); - int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM); // Don't share connection pool per client connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum); userInfo = UserRoleInfo.getCurrentUser(); @@ -109,6 +109,10 @@ public class SessionConnection implements Closeable { return connPool.getConnection(addr, protocolClass, asyncMode); } + protected KeyValueSet getProperties() { + return properties; + } + @SuppressWarnings("unused") public void setSessionId(TajoIdProtos.SessionIdProto sessionId) { this.sessionId = sessionId; @@ -134,10 +138,6 @@ public class SessionConnection implements Closeable { return false; } - public TajoConf getConf() { - return conf; - } - public UserRoleInfo getUserInfo() { return userInfo; } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index f8eef28..e61bea0 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -19,10 +19,8 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryId; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.annotation.ThreadSafe; @@ -31,19 +29,14 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; -import org.apache.tajo.jdbc.TajoResultSet; -import org.apache.tajo.rule.EvaluationContext; -import org.apache.tajo.rule.EvaluationFailedException; -import org.apache.tajo.rule.SelfDiagnosisRuleEngine; -import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.service.ServiceTracker; -import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.util.KeyValueSet; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URI; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; @@ -58,39 +51,41 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que /** * Connect to TajoMaster * - * @param conf TajoConf * @param tracker ServiceTracker to discovery Tajo Client RPC * @param baseDatabase The base database name. It is case sensitive. If it is null, * the 'default' database will be used. + * @param properties configurations * @throws java.io.IOException */ - public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException { - super(conf, tracker, baseDatabase); + public TajoClientImpl(ServiceTracker tracker, @Nullable String baseDatabase, KeyValueSet properties) + throws IOException { + super(tracker, baseDatabase, properties); this.queryClient = new QueryClientImpl(this); this.catalogClient = new CatalogAdminClientImpl(this); - - diagnoseTajoClient(); } - public TajoClientImpl(TajoConf conf) throws IOException { - this(conf, ServiceTrackerFactory.get(conf), null); + /** + * Connect to TajoMaster + * + * @param addr TajoMaster address + * @param baseDatabase The base database name. It is case sensitive. If it is null, + * the 'default' database will be used. + * @param properties configurations + * @throws java.io.IOException + */ + public TajoClientImpl(InetSocketAddress addr, @Nullable String baseDatabase, KeyValueSet properties) + throws IOException { + this(new DummyServiceTracker(addr), baseDatabase, properties); } - public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException { - this(conf, ServiceTrackerFactory.get(conf), baseDatabase); - } - - private void diagnoseTajoClient() throws EvaluationFailedException { - SelfDiagnosisRuleEngine ruleEngine = SelfDiagnosisRuleEngine.getInstance(); - SelfDiagnosisRuleSession ruleSession = ruleEngine.newRuleSession(); - EvaluationContext context = new EvaluationContext(); - - context.addParameter(TajoConf.class.getName(), getConf()); - - ruleSession.withRuleNames("TajoConfValidationRule").fireRules(context); + public TajoClientImpl(ServiceTracker serviceTracker) throws IOException { + this(serviceTracker, null); } + public TajoClientImpl(ServiceTracker serviceTracker, @Nullable String baseDatabase) throws IOException { + this(serviceTracker, baseDatabase, new KeyValueSet()); + } /*------------------------------------------------------------------------*/ // QueryClient wrappers /*------------------------------------------------------------------------*/ @@ -128,7 +123,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que } public ResultSet createNullResultSet(QueryId queryId) throws IOException { - return new TajoResultSet(this, queryId); + return TajoClientUtil.createNullResultSet(queryId); } public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException { @@ -195,12 +190,12 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return catalogClient.existTable(tableName); } - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta) throws SQLException, ServiceException { return catalogClient.createExternalTable(tableName, schema, path, meta); } - public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) throws SQLException, ServiceException { return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java index 744ba1d..ea15aed 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -24,11 +24,9 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; -import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import java.io.IOException; @@ -58,20 +56,19 @@ public class TajoClientUtil { return !isQueryWaitingForSchedule(state) && !isQueryRunning(state); } - public static ResultSet createResultSet(TajoConf conf, TajoClient client, QueryId queryId, - ClientProtos.GetQueryResultResponse response) + public static ResultSet createResultSet(TajoClient client, QueryId queryId, + ClientProtos.GetQueryResultResponse response, int fetchRows) throws IOException { TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); - conf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); - return new TajoResultSet(client, queryId, conf, desc); + return new FetchResultSet(client, desc.getLogicalSchema(), queryId, fetchRows); } - public static ResultSet createResultSet(TajoConf conf, QueryClient client, ClientProtos.SubmitQueryResponse response) + public static ResultSet createResultSet(QueryClient client, ClientProtos.SubmitQueryResponse response, int fetchRows) throws IOException { if (response.hasTableDesc()) { // non-forward query // select * from table1 [limit 10] - int fetchRowNum = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM); + int fetchRowNum = fetchRows; if (response.hasSessionVars()) { for (PrimitiveProtos.KeyValueProto eachKeyValue: response.getSessionVars().getKeyvalList()) { if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) { @@ -85,11 +82,19 @@ public class TajoClientUtil { // simple eval query // select substr('abc', 1, 2) ClientProtos.SerializedResultSet serializedResultSet = response.getResultSet(); - return new TajoMemoryResultSet( + return new TajoMemoryResultSet(new QueryId(response.getQueryId()), new Schema(serializedResultSet.getSchema()), serializedResultSet.getSerializedTuplesList(), response.getMaxRowNum(), client.getClientSideSessionVars()); } } + + public static ResultSet createNullResultSet() { + return new TajoMemoryResultSet(null, new Schema(), null, 0, null); + } + + public static ResultSet createNullResultSet(QueryId queryId) { + return new TajoMemoryResultSet(queryId, new Schema(), null, 0, null); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java index 18b7c1a..06773f4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -21,7 +21,6 @@ package org.apache.tajo.jdbc; import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.client.QueryClient; -import org.apache.tajo.client.TajoClient; import org.apache.tajo.storage.Tuple; import java.io.IOException; @@ -43,6 +42,10 @@ public class FetchResultSet extends TajoResultSetBase { this.schema = schema; } + public QueryId getQueryId() { + return queryId; + } + @Override protected Tuple nextTuple() throws IOException { if (finished) { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java index d0898f5..33cb838 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java @@ -19,6 +19,7 @@ package org.apache.tajo.jdbc; import com.google.protobuf.ByteString; +import org.apache.tajo.QueryId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.Tuple; @@ -30,17 +31,19 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public class TajoMemoryResultSet extends TajoResultSetBase { + private QueryId queryId; private List<ByteString> serializedTuples; private AtomicBoolean closed = new AtomicBoolean(false); private RowStoreUtil.RowStoreDecoder decoder; - public TajoMemoryResultSet(Schema schema, List<ByteString> serializedTuples, int maxRowNum, + public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> serializedTuples, int maxRowNum, Map<String, String> clientSideSessionVars) { super(clientSideSessionVars); + this.queryId = queryId; this.schema = schema; this.totalRow = maxRowNum; this.serializedTuples = serializedTuples; - decoder = RowStoreUtil.createDecoder(schema); + this.decoder = RowStoreUtil.createDecoder(schema); init(); } @@ -50,6 +53,10 @@ public class TajoMemoryResultSet extends TajoResultSetBase { curRow = 0; } + public QueryId getQueryId() { + return queryId; + } + @Override public synchronized void close() throws SQLException { if (closed.getAndSet(true)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java deleted file mode 100644 index bf33082..0000000 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.jdbc; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.client.QueryClient; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -public class TajoResultSet extends TajoResultSetBase { - private static final int INFINITE_ROW_NUM = Integer.MAX_VALUE; - - private FileSystem fs; - private Scanner scanner; - private QueryClient tajoClient; - private TajoConf conf; - private TableDesc desc; - private Long maxRowNum = null; - private QueryId queryId; - private AtomicBoolean closed = new AtomicBoolean(false); - - public TajoResultSet(QueryClient tajoClient, QueryId queryId) { - super(tajoClient.getClientSideSessionVars()); - this.tajoClient = tajoClient; - this.queryId = queryId; - init(); - } - - public TajoResultSet(QueryClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException { - super(tajoClient.getClientSideSessionVars()); - this.tajoClient = tajoClient; - this.queryId = queryId; - this.conf = conf; - this.desc = table; - initScanner(); - init(); - } - - public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table, long maxRowNum) - throws IOException { - this(tajoClient, queryId, conf, table); - this.maxRowNum = maxRowNum; - initScanner(); - init(); - } - - private void initScanner() throws IOException { - if(desc != null) { - schema = desc.getSchema(); - fs = FileScanner.getFileSystem(conf, new Path(desc.getPath())); - if (maxRowNum != null) { - this.totalRow = maxRowNum; - } else { - this.totalRow = desc.getStats() != null ? desc.getStats().getNumRows() : INFINITE_ROW_NUM; - } - - if (totalRow == 0) { - totalRow = INFINITE_ROW_NUM; - } - - List<Fragment> frags = getFragments(new Path(desc.getPath())); - scanner = new MergeScanner(conf, desc.getSchema(), desc.getMeta(), frags); - } - } - - @Override - protected void init() { - cur = null; - curRow = 0; - } - - public static class FileNameComparator implements Comparator<FileStatus> { - - @Override - public int compare(FileStatus f1, FileStatus f2) { - return f1.getPath().getName().compareTo(f2.getPath().getName()); - } - } - - private List<Fragment> getFragments(Path tablePath) - throws IOException { - List<Fragment> fragments = Lists.newArrayList(); - FileStatus[] files = fs.listStatus(tablePath, new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().charAt(0) != '.'; - } - }); - - - // The files must be sorted in an ascending order of file names - // in order to guarantee the order of a sort operation. - // This is because our distributed sort algorithm outputs - // a sequence of sorted data files, each of which contains sorted rows - // within each file. - Arrays.sort(files, new FileNameComparator()); - - String tbname = tablePath.getName(); - for (int i = 0; i < files.length; i++) { - if (files[i].getLen() == 0) { - continue; - } - fragments.add(new FileFragment(tbname + "_" + i, files[i].getPath(), 0l, files[i].getLen())); - } - return ImmutableList.copyOf(fragments); - } - - @Override - public synchronized void close() throws SQLException { - if (closed.getAndSet(true)) { - return; - } - - try { - if(tajoClient != null && queryId != null && !queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - this.tajoClient.closeQuery(queryId); - } - } catch (Exception e) { - e.printStackTrace(); - } - try { - if(scanner != null) { - this.scanner.close(); - } - //TODO clean temp result file - cur = null; - curRow = -1; - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void beforeFirst() throws SQLException { - try { - if(scanner != null) { - scanner.reset(); - } else { - initScanner(); - } - init(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - protected Tuple nextTuple() throws IOException { - if(scanner == null) { - return null; - } - - if (maxRowNum != null && curRow >= maxRowNum) { - return null; - } - - Tuple tuple = scanner.next(); - if (tuple == null) { - //query is closed automatically by querymaster but scanner is not - scanner.close(); - scanner = null; - } - - return tuple; - } - - public boolean hasResult() { - return scanner != null; - } - - public QueryId getQueryId() { - return queryId; - } - - public TableDesc getTableDesc() { - return desc; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java new file mode 100644 index 0000000..385f99c --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.exception.UnknownDataTypeException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.util.BitArray; + +import java.nio.ByteBuffer; + +/** + * It is a copy from tajo-storage-common module. + */ +public class RowStoreUtil { + public static int[] getTargetIds(Schema inSchema, Schema outSchema) { + int[] targetIds = new int[outSchema.size()]; + int i = 0; + for (Column target : outSchema.getColumns()) { + targetIds[i] = inSchema.getColumnId(target.getQualifiedName()); + i++; + } + + return targetIds; + } + + public static Tuple project(Tuple in, Tuple out, int[] targetIds) { + out.clear(); + for (int idx = 0; idx < targetIds.length; idx++) { + out.put(idx, in.get(targetIds[idx])); + } + return out; + } + + public static RowStoreEncoder createEncoder(Schema schema) { + return new RowStoreEncoder(schema); + } + + public static RowStoreDecoder createDecoder(Schema schema) { + return new RowStoreDecoder(schema); + } + + public static class RowStoreDecoder { + + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreDecoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + + public Tuple toTuple(byte [] bytes) { + nullFlags.clear(); + ByteBuffer bb = ByteBuffer.wrap(bytes); + Tuple tuple = new VTuple(schema.size()); + Column col; + TajoDataTypes.DataType type; + + bb.limit(headerSize); + nullFlags.fromByteBuffer(bb); + bb.limit(bytes.length); + + for (int i =0; i < schema.size(); i++) { + if (nullFlags.get(i)) { + tuple.put(i, DatumFactory.createNullDatum()); + continue; + } + + col = schema.getColumn(i); + type = col.getDataType(); + switch (type.getType()) { + case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; + case BIT: + byte b = bb.get(); + tuple.put(i, DatumFactory.createBit(b)); + break; + + case CHAR: + byte c = bb.get(); + tuple.put(i, DatumFactory.createChar(c)); + break; + + case INT2: + short s = bb.getShort(); + tuple.put(i, DatumFactory.createInt2(s)); + break; + + case INT4: + case DATE: + int i_ = bb.getInt(); + tuple.put(i, DatumFactory.createFromInt4(type, i_)); + break; + + case INT8: + case TIME: + case TIMESTAMP: + long l = bb.getLong(); + tuple.put(i, DatumFactory.createFromInt8(type, l)); + break; + + case INTERVAL: + int month = bb.getInt(); + long milliseconds = bb.getLong(); + tuple.put(i, new IntervalDatum(month, milliseconds)); + break; + + case FLOAT4: + float f = bb.getFloat(); + tuple.put(i, DatumFactory.createFloat4(f)); + break; + + case FLOAT8: + double d = bb.getDouble(); + tuple.put(i, DatumFactory.createFloat8(d)); + break; + + case TEXT: + byte [] _string = new byte[bb.getInt()]; + bb.get(_string); + tuple.put(i, DatumFactory.createText(_string)); + break; + + case BLOB: + byte [] _bytes = new byte[bb.getInt()]; + bb.get(_bytes); + tuple.put(i, DatumFactory.createBlob(_bytes)); + break; + + case INET4: + byte [] _ipv4 = new byte[4]; + bb.get(_ipv4); + tuple.put(i, DatumFactory.createInet4(_ipv4)); + break; + case INET6: + // TODO - to be implemented + throw new UnsupportedException(type.getType().name()); + default: + throw new RuntimeException(new UnknownDataTypeException(type.getType().name())); + } + } + return tuple; + } + + public Schema getSchema() { + return schema; + } + } + + public static class RowStoreEncoder { + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreEncoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + public byte [] toBytes(Tuple tuple) { + nullFlags.clear(); + int size = estimateTupleDataSize(tuple); + ByteBuffer bb = ByteBuffer.allocate(size + headerSize); + bb.position(headerSize); + Column col; + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + nullFlags.set(i); + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case NULL_TYPE: nullFlags.set(i); break; + case BOOLEAN: bb.put(tuple.get(i).asByte()); break; + case BIT: bb.put(tuple.get(i).asByte()); break; + case CHAR: bb.put(tuple.get(i).asByte()); break; + case INT2: bb.putShort(tuple.get(i).asInt2()); break; + case INT4: bb.putInt(tuple.get(i).asInt4()); break; + case INT8: bb.putLong(tuple.get(i).asInt8()); break; + case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break; + case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break; + case TEXT: + byte [] _string = tuple.get(i).asByteArray(); + bb.putInt(_string.length); + bb.put(_string); + break; + case DATE: bb.putInt(tuple.get(i).asInt4()); break; + case TIME: + case TIMESTAMP: + bb.putLong(tuple.get(i).asInt8()); + break; + case INTERVAL: + IntervalDatum interval = (IntervalDatum) tuple.get(i); + bb.putInt(interval.getMonths()); + bb.putLong(interval.getMilliSeconds()); + break; + case BLOB: + byte [] bytes = tuple.get(i).asByteArray(); + bb.putInt(bytes.length); + bb.put(bytes); + break; + case INET4: + byte [] ipBytes = tuple.get(i).asByteArray(); + bb.put(ipBytes); + break; + case INET6: bb.put(tuple.get(i).asByteArray()); break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + byte[] flags = nullFlags.toArray(); + int finalPosition = bb.position(); + bb.position(0); + bb.put(flags); + + bb.position(finalPosition); + bb.flip(); + byte [] buf = new byte [bb.limit()]; + bb.get(buf); + return buf; + } + + // Note that, NULL values are treated separately + private int estimateTupleDataSize(Tuple tuple) { + int size = 0; + Column col; + + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: size += 1; break; + case INT2: size += 2; break; + case DATE: + case INT4: + case FLOAT4: size += 4; break; + case TIME: + case TIMESTAMP: + case INT8: + case FLOAT8: size += 8; break; + case INTERVAL: size += 12; break; + case TEXT: + case BLOB: size += (4 + tuple.get(i).asByteArray().length); break; + case INET4: + case INET6: size += tuple.get(i).asByteArray().length; break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + size += 100; // optimistic reservation + + return size; + } + + public Schema getSchema() { + return schema; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index a9f5498..4c359a2 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -105,8 +105,8 @@ message BriefQueryInfo { required int64 startTime = 3; required int64 finishTime = 4; required string query = 5; - required string queryMasterHost = 6; - required int32 queryMasterPort = 7; + optional string queryMasterHost = 6; + optional int32 queryMasterPort = 7; required float progress = 8; } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java index cfff369..1ff305d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java @@ -19,6 +19,7 @@ package org.apache.tajo.datum; import com.google.gson.annotations.Expose; +import org.apache.tajo.SessionVars; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.exception.InvalidCastException; @@ -32,7 +33,13 @@ public abstract class Datum implements Comparable<Datum>, GsonObject { static boolean abortWhenDivideByZero; static { - initAbortWhenDivideByZero(new TajoConf()); + try { + //TODO separate hadoop configuration from TajoConf + initAbortWhenDivideByZero(new TajoConf()); + } catch (NoClassDefFoundError error) { + abortWhenDivideByZero = Boolean.valueOf(System.getProperty(SessionVars.ARITHABORT.getConfVars().keyname() + , SessionVars.ARITHABORT.getConfVars().defaultVal)); + } } @Expose private final Type type; http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-common/src/main/java/org/apache/tajo/exception/UnknownDataTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UnknownDataTypeException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UnknownDataTypeException.java new file mode 100644 index 0000000..abd9ab5 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/exception/UnknownDataTypeException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.exception; + +public class UnknownDataTypeException extends Exception { + + private static final long serialVersionUID = -2630390595968966164L; + + public UnknownDataTypeException() { + + } + + public UnknownDataTypeException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java index 0304e92..6c02dc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java @@ -18,7 +18,6 @@ package org.apache.tajo.benchmark; -import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.catalog.CatalogConstants; @@ -30,6 +29,7 @@ import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; import java.io.File; @@ -53,11 +53,11 @@ public abstract class BenchmarkSet { String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname); InetSocketAddress addr = NetUtils.createSocketAddr(addressStr); ServiceTracker serviceTracker = new DummyServiceTracker(addr); - tajo = new TajoClientImpl(conf, serviceTracker, null); + tajo = new TajoClientImpl(serviceTracker, null); } else { conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName()); - tajo = new TajoClientImpl(conf); + tajo = new TajoClientImpl(ServiceTrackerFactory.get(conf)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java index e2ea25c..609e49c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java @@ -223,7 +223,7 @@ public class TPCH extends BenchmarkSet { } try { tajo.createExternalTable(tableName, getSchema(tableName), - new Path(dataDir, tableName), meta, partitionMethodDesc); + new Path(dataDir, tableName).toUri(), meta, partitionMethodDesc); } catch (SQLException s) { throw new ServiceException(s); } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index db895ef..8070a7c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryCoordinatorProtocol; @@ -40,6 +41,7 @@ import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; import org.apache.tajo.util.history.HistoryReader; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -151,6 +153,24 @@ public class QueryManager extends CompositeService { } } + public QueryInfo createNewSimpleQuery(QueryContext queryContext, Session session, String sql, LogicalRootNode plan) + throws IOException { + + QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); + QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, + null, plan); + QueryInfo queryInfo = queryInProgress.getQueryInfo(); + queryInfo.setQueryState(TajoProtos.QueryState.QUERY_SUCCEEDED); + queryInfo.setFinishTime(System.currentTimeMillis()); + queryInProgress.stopProgress(); + + synchronized (historyCache) { + historyCache.put(queryInfo.getQueryId(), queryInfo); + } + + return queryInProgress.getQueryInfo(); + } + public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql, String jsonExpr, LogicalRootNode plan) throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 6af3248..418c30b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -18,6 +18,7 @@ package org.apache.tajo.master; +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -44,7 +45,11 @@ import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.exec.NonForwardQueryResultFileScanner; import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.PartitionedTableScanNode; +import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; @@ -395,8 +400,10 @@ public class TajoMasterClientService extends AbstractService { infoBuilder.setStartTime(queryInfo.getStartTime()); infoBuilder.setFinishTime(System.currentTimeMillis()); infoBuilder.setProgress(queryInfo.getProgress()); - infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort()); - infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); + if(queryInfo.getQueryMasterHost() != null){ + infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort()); + infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); + } builder.addQueryList(infoBuilder.build()); } @@ -428,8 +435,10 @@ public class TajoMasterClientService extends AbstractService { infoBuilder.setStartTime(queryInfo.getStartTime()); infoBuilder.setFinishTime(queryInfo.getFinishTime()); infoBuilder.setProgress(queryInfo.getProgress()); - infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort()); - infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); + if(queryInfo.getQueryMasterHost() != null){ + infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort()); + infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); + } builder.addQueryList(infoBuilder.build()); } @@ -517,7 +526,26 @@ public class TajoMasterClientService extends AbstractService { QueryId queryId = new QueryId(request.getQueryId()); NonForwardQueryResultScanner queryResultScanner = session.getNonForwardQueryResultScanner(queryId); if (queryResultScanner == null) { - throw new ServiceException("No NonForwardQueryResultScanner for " + queryId); + QueryInfo queryInfo = context.getQueryJobManager().getFinishedQuery(queryId); + Preconditions.checkNotNull(queryInfo, "QueryInfo cannot be NULL."); + + TableDesc resultTableDesc = queryInfo.getResultDesc(); + Preconditions.checkNotNull(resultTableDesc, "QueryInfo::getResultDesc results in NULL."); + + ScanNode scanNode; + if (resultTableDesc.hasPartition()) { + scanNode = LogicalPlan.createNodeWithoutPID(PartitionedTableScanNode.class); + scanNode.init(resultTableDesc); + } else { + scanNode = LogicalPlan.createNodeWithoutPID(ScanNode.class); + scanNode.init(resultTableDesc); + } + + queryResultScanner = + new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, + resultTableDesc, Integer.MAX_VALUE); + queryResultScanner.init(); + session.addNonForwardQueryResultScanner(queryResultScanner); } List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum()); @@ -569,7 +597,7 @@ public class TajoMasterClientService extends AbstractService { QueryInfo queryInfo = null; if (queryInProgress == null) { - queryInfo = context.getHistoryReader().getQueryInfo(queryId.toString()); + queryInfo = context.getQueryJobManager().getFinishedQuery(queryId); } else { queryInfo = queryInProgress.getQueryInfo(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 0860d63..db82fca 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -221,15 +221,16 @@ public class QueryExecutor { desc.getStats().setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); } - QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId()); + QueryInfo queryInfo = context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query, + (LogicalRootNode) plan.getRootBlock().getRoot()); - NonForwardQueryResultScanner queryResultScanner = - new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow); + NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultFileScanner( + context.getConf(), session.getSessionId(), queryInfo.getQueryId(), scanNode, desc, maxRow); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); - response.setQueryId(queryId.getProto()); + response.setQueryId(queryInfo.getQueryId().getProto()); response.setMaxRowNum(maxRow); response.setTableDesc(desc.getProto()); response.setResultCode(ClientProtos.ResultCode.OK); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 3603b79..d8bb8f1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -9,10 +9,14 @@ import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.client.*; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; -import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.jdbc.FetchResultSet; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.JSPUtil; import org.apache.tajo.util.TajoIdUtils; import org.codehaus.jackson.map.DeserializationConfig; @@ -29,7 +33,10 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -74,7 +81,7 @@ public class QueryExecutorServlet extends HttpServlet { try { tajoConf = new TajoConf(); - tajoClient = new TajoClientImpl(tajoConf); + tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); new QueryRunnerCleaner().start(); } catch (IOException e) { @@ -355,7 +362,7 @@ public class QueryExecutorServlet extends HttpServlet { // non-forwarded INSERT INTO query does not have any query id. // In this case, it just returns succeeded query information without printing the query results. } else { - res = TajoClientUtil.createResultSet(tajoConf, tajoClient, response); + res = TajoClientUtil.createResultSet(tajoClient, response, sizeLimit); MakeResultText(res, desc); } progress.set(100); @@ -436,7 +443,7 @@ public class QueryExecutorServlet extends HttpServlet { ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId); TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc()); tajoConf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName()); - res = new TajoResultSet(tajoClient, queryId, tajoConf, desc); + res = new FetchResultSet(tajoClient, desc.getLogicalSchema(), queryId, sizeLimit); MakeResultText(res, desc); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 8f84a9d..df3be12 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -131,6 +131,7 @@ public class Task { this.context.setEnforcer(request.getEnforcer()); this.context.setState(TaskAttemptState.TA_PENDING); this.inputStats = new TableStats(); + this.fetcherRunners = Lists.newArrayList(); } public void initPlan() throws IOException { @@ -212,7 +213,7 @@ public class Task { } } // for localizing the intermediate data - localize(request); + fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); } } @@ -220,14 +221,6 @@ public class Task { return taskId; } - public static Log getLog() { - return LOG; - } - - public void localize(TaskRequest request) throws IOException { - fetcherRunners = getFetchRunners(context, request.getFetches()); - } - public TaskAttemptId getId() { return context.getTaskId(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/resources/webapps/admin/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp index 09d9e2e..d08c0d9 100644 --- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp @@ -19,9 +19,6 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="org.apache.tajo.util.FileUtil" %> -<%@ page import="org.apache.tajo.util.JSPUtil" %> -<%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.text.NumberFormat" %> <%@ page import="java.text.SimpleDateFormat" %> @@ -32,6 +29,8 @@ <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.TaskHistory" %> +<%@ page import="org.apache.tajo.util.*" %> +<%@ page import="org.apache.commons.lang.math.NumberUtils" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -41,7 +40,7 @@ String startTime = request.getParameter("startTime"); String ebId = request.getParameter("ebid"); - QueryHistory queryHistory = reader.getQueryHistory(queryId); + QueryHistory queryHistory = reader.getQueryHistory(queryId, NumberUtils.toLong(startTime, 0)); List<StageHistory> stageHistories = queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/main/resources/webapps/worker/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index 56bdeba..eb40b4f 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -19,23 +19,25 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> +<%@ page import="org.apache.commons.lang.math.NumberUtils" %> <%@ page import="org.apache.tajo.QueryId" %> +<%@ page import="org.apache.tajo.SessionVars" %> <%@ page import="org.apache.tajo.querymaster.Query" %> <%@ page import="org.apache.tajo.querymaster.QueryMasterTask" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> +<%@ page import="org.apache.tajo.util.history.HistoryReader" %> +<%@ page import="org.apache.tajo.util.history.QueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="org.apache.tajo.worker.TajoWorker" %> <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> -<%@ page import="org.apache.tajo.SessionVars" %> -<%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.StageHistory" %> -<%@ page import="org.apache.tajo.util.history.HistoryReader" %> <% QueryId queryId = TajoIdUtils.parseQueryId(request.getParameter("queryId")); + String startTime = request.getParameter("startTime"); TajoWorker tajoWorker = (TajoWorker) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext() @@ -53,7 +55,7 @@ } } else { HistoryReader reader = tajoWorker.getWorkerContext().getHistoryReader(); - queryHistory = reader.getQueryHistory(queryId.toString()); + queryHistory = reader.getQueryHistory(queryId.toString(), NumberUtils.toLong(startTime, 0)); } if (!runningQuery && queryHistory == null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java index 7b5e610..801c71f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java +++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java @@ -27,11 +27,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; @@ -103,7 +105,7 @@ public class LocalTajoTestingUtility { util = new TajoTestingCluster(); util.startMiniCluster(1); conf = util.getConfiguration(); - client = new TajoClientImpl(conf); + client = util.newTajoClient(); FileSystem fs = util.getDefaultFileSystem(); Path rootDir = TajoConf.getWarehouseDir(conf); http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 1605560..15fbdae 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -32,18 +32,23 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.cli.tsql.ParsedResult; import org.apache.tajo.cli.tsql.SimpleParser; import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.GlobalEngine; -import org.apache.tajo.plan.*; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.verifier.LogicalPlanVerifier; import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier; import org.apache.tajo.plan.verifier.VerificationState; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.FileUtil; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.rules.TestName; import java.io.File; @@ -53,7 +58,10 @@ import java.net.URL; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import static org.junit.Assert.*; @@ -177,9 +185,9 @@ public class QueryTestCaseBase { @Rule public TestName name = new TestName(); @BeforeClass - public static void setUpClass() throws IOException { + public static void setUpClass() throws Exception { conf = testBase.getTestingCluster().getConfiguration(); - client = new TajoClientImpl(conf); + client = testBase.getTestingCluster().newTajoClient(); } @AfterClass http://git-wip-us.apache.org/repos/asf/tajo/blob/dbf91f54/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 43bd52a..17348e1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -50,6 +50,7 @@ import org.apache.tajo.querymaster.Query; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; @@ -619,17 +620,8 @@ public class TajoTestingCluster { isTajoClusterRunning = false; } - public static TajoClient newTajoClient() throws Exception { - TpchTestBase instance = TpchTestBase.getInstance(); - TajoTestingCluster util = instance.getTestingCluster(); - while(true) { - if(util.getMaster().isMasterRunning()) { - break; - } - Thread.sleep(1000); - } - TajoConf conf = util.getConfiguration(); - return new TajoClientImpl(conf); + public TajoClient newTajoClient() throws Exception { + return new TajoClientImpl(ServiceTrackerFactory.get(getConfiguration())); } public static ResultSet run(String[] names, @@ -665,7 +657,7 @@ public class TajoTestingCluster { Thread.sleep(1000); } TajoConf conf = util.getConfiguration(); - TajoClient client = new TajoClientImpl(conf); + TajoClient client = new TajoClientImpl(ServiceTrackerFactory.get(conf)); try { return run(names, schemas, tableOption, tables, query, client); @@ -690,7 +682,7 @@ public class TajoTestingCluster { Thread.sleep(1000); } TajoConf conf = util.getConfiguration(); - TajoClient client = new TajoClientImpl(conf); + TajoClient client = new TajoClientImpl(ServiceTrackerFactory.get(conf)); try { FileSystem fs = util.getDefaultFileSystem(); Path rootDir = TajoConf.getWarehouseDir(util.getConfiguration()); @@ -720,7 +712,7 @@ public class TajoTestingCluster { } } TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV, tableOption); - client.createExternalTable(tableName, schema, tablePath, meta); + client.createExternalTable(tableName, schema, tablePath.toUri(), meta); } finally { client.close(); }
