Repository: tajo Updated Branches: refs/heads/master cda6c897a -> 7510f886e
TAJO-910: Simple query (non-forwarded query) should be supported against partition tables. Closes #138 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7510f886 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7510f886 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7510f886 Branch: refs/heads/master Commit: 7510f886e7f310dede3f9460ca40ad4bdd46e4cf Parents: cda6c89 Author: HyoungJun Kim <[email protected]> Authored: Mon Sep 22 09:21:04 2014 +0900 Committer: HyoungJun Kim <[email protected]> Committed: Mon Sep 22 09:21:04 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/client/TajoClient.java | 117 +++++++++++-- .../org/apache/tajo/jdbc/FetchResultSet.java | 91 ++++++++++ .../org/apache/tajo/jdbc/TajoResultSetBase.java | 2 +- tajo-client/src/main/proto/ClientProtos.proto | 29 +++- .../main/proto/TajoMasterClientProtocol.proto | 4 +- .../main/java/org/apache/tajo/SessionVars.java | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 5 +- .../java/org/apache/tajo/benchmark/TPCH.java | 32 +++- .../apache/tajo/engine/planner/PlannerUtil.java | 138 +++++++++++++++- .../org/apache/tajo/master/GlobalEngine.java | 31 ++-- .../master/NonForwardQueryResultScanner.java | 165 +++++++++++++++++++ .../tajo/master/TajoMasterClientService.java | 69 +++++++- .../org/apache/tajo/master/session/Session.java | 54 ++++++ .../tajo/master/session/SessionManager.java | 13 +- .../apache/tajo/worker/TaskAttemptContext.java | 2 - .../org/apache/tajo/TajoTestingCluster.java | 45 +++-- .../tajo/engine/planner/TestPlannerUtil.java | 62 +++++++ .../tajo/engine/query/TestNullValues.java | 32 +++- .../tajo/engine/query/TestSelectQuery.java | 15 ++ .../org/apache/tajo/jdbc/TestResultSet.java | 10 +- .../apache/tajo/master/TestGlobalPlanner.java | 68 +++++++- .../queries/TestSelectQuery/customer_ddl.sql | 9 + .../TestSelectQuery/insert_into_customer.sql | 11 ++ ...testSimpleQueryWithLimitPartitionedTable.sql | 1 + ...tSimpleQueryWithLimitPartitionedTable.result | 12 ++ .../TestTajoCli/testHelpSessionVars.result | 1 + 27 files changed, 936 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index b5e4842..21bdd8c 100644 --- a/CHANGES +++ b/CHANGES @@ -31,6 +31,9 @@ Release 0.9.0 - unreleased IMPROVEMENT + TAJO-910: Simple query (non-forwarded query) should be supported against + partition tables. (Hyoungjun Kim) + TAJO-1035: Add default TAJO_PULLSERVER_HEAPSIZE. (Hyoungjun Kim) TAJO-1049: Remove the parallel degree limit up to the maximum cluster http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index ab3d874..3d61cce 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.annotation.Nullable; @@ -33,7 +34,10 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto; import org.apache.tajo.cli.InvalidClientSessionException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -43,12 +47,14 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol; import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService; import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; +import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.SQLStates; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; @@ -63,7 +69,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet; +import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet; @ThreadSafe public class TajoClient implements Closeable { @@ -204,6 +210,25 @@ public class TajoClient implements Closeable { } } + public void closeNonForwardQuery(final QueryId queryId) { + NettyClientBase tmClient = null; + try { + tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); + checkSessionAndGet(tmClient); + + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); + + builder.setSessionId(getSessionId()); + builder.setQueryId(queryId.getProto()); + tajoMasterService.closeNonForwardQuery(null, builder.build()); + } catch (Exception e) { + LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); + } finally { + connPool.closeConnection(tmClient); + } + } + private void checkSessionAndGet(NettyClientBase client) throws ServiceException { if (sessionId == null) { TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); @@ -518,23 +543,21 @@ public class TajoClient implements Closeable { public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException { if (response.hasTableDesc()) { - TajoConf conf = new TajoConf(client.getConf()); - conf.setVar(ConfVars.USERNAME, response.getUserName()); - if (response.hasMaxRowNum()) { - return new TajoResultSet( - client, - QueryIdFactory.NULL_QUERY_ID, - conf, - new TableDesc(response.getTableDesc()), - response.getMaxRowNum()); - } else { - return new TajoResultSet( - client, - QueryIdFactory.NULL_QUERY_ID, - conf, - new TableDesc(response.getTableDesc())); + // non-forward query + // select * from table1 [limit 10] + int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM); + if (response.hasSessionVariables()) { + for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) { + if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) { + fetchRowNum = Integer.parseInt(eachKeyValue.getValue()); + } + } } + TableDesc tableDesc = new TableDesc(response.getTableDesc()); + return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum); } else { + // simple eval query + // select substr('abc', 1, 2) SerializedResultSet serializedResultSet = response.getResultSet(); return new TajoMemoryResultSet( new Schema(serializedResultSet.getSchema()), @@ -606,6 +629,44 @@ public class TajoClient implements Closeable { } } + public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException { + try { + ServerCallable<SerializedResultSet> callable = + new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + public SerializedResultSet call(NettyClientBase client) throws ServiceException { + checkSessionAndGet(client); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); + GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); + builder.setSessionId(sessionId); + builder.setQueryId(queryId.getProto()); + builder.setFetchRowNum(fetchRowNum); + try { + GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); + if (response.getResultCode() == ResultCode.ERROR) { + throw new ServiceException(response.getErrorTrace()); + } + + return response.getResultSet(); + } catch (ServiceException e) { + abort(); + throw e; + } catch (Throwable t) { + throw new ServiceException(t.getMessage(), t); + } + } + }; + + SerializedResultSet serializedResultSet = callable.withRetries(); + + return new TajoMemoryResultSet( + new Schema(serializedResultSet.getSchema()), + serializedResultSet.getSerializedTuplesList(), + serializedResultSet.getSerializedTuplesCount()); + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } + } + public boolean updateQuery(final String sql) throws ServiceException { return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { @@ -747,6 +808,25 @@ public class TajoClient implements Closeable { public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, final TableMeta meta) throws SQLException, ServiceException { + return createExternalTable(tableName, schema, path, meta, null); + } + + /** + * Create an external table. + * + * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not. + * If the table name is not qualified, the current database in the session will be used. + * @param schema The schema + * @param path The external table location + * @param meta Table meta + * @param partitionMethodDesc Table partition description + * @return the created table description. + * @throws SQLException + * @throws ServiceException + */ + public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path, + final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) + throws SQLException, ServiceException { return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { public TableDesc call(NettyClientBase client) throws ServiceException, SQLException { @@ -760,6 +840,9 @@ public class TajoClient implements Closeable { builder.setSchema(schema.getProto()); builder.setMeta(meta.getProto()); builder.setPath(path.toUri().toString()); + if (partitionMethodDesc != null) { + builder.setPartition(partitionMethodDesc.getProto()); + } TableResponse res = tajoMasterService.createExternalTable(null, builder.build()); if (res.getResultCode() == ResultCode.OK) { return CatalogUtil.newTableDesc(res.getTableDesc()); @@ -915,7 +998,7 @@ public class TajoClient implements Closeable { checkSessionAndGet(tmClient); - KillQueryRequest.Builder builder = KillQueryRequest.newBuilder(); + QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); builder.setSessionId(sessionId); builder.setQueryId(queryId.getProto()); tajoMasterService.killQuery(null, builder.build()); http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java new file mode 100644 index 0000000..7ebce91 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.jdbc; + +import org.apache.tajo.QueryId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.sql.SQLException; + +public class FetchResultSet extends TajoResultSetBase { + private TajoClient tajoClient; + private QueryId queryId; + private int fetchRowNum; + private TajoMemoryResultSet currentResultSet; + private boolean finished = false; + + public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) { + this.tajoClient = tajoClient; + this.queryId = queryId; + this.fetchRowNum = fetchRowNum; + this.totalRow = Integer.MAX_VALUE; + this.schema = schema; + } + + @Override + protected Tuple nextTuple() throws IOException { + if (finished) { + return null; + } + + try { + Tuple tuple = null; + if (currentResultSet != null) { + currentResultSet.next(); + tuple = currentResultSet.cur; + } + if (currentResultSet == null || tuple == null) { + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + currentResultSet = tajoClient.fetchNextQueryResult(queryId, fetchRowNum); + if (currentResultSet == null) { + finished = true; + return null; + } + + currentResultSet.next(); + tuple = currentResultSet.cur; + } + if (tuple == null) { + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + finished = true; + } + return tuple; + } catch (Throwable t) { + throw new IOException(t.getMessage(), t); + } + } + + @Override + public void close() throws SQLException { + if (currentResultSet != null) { + currentResultSet.close(); + currentResultSet = null; + } + tajoClient.closeNonForwardQuery(queryId); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java index d189c78..78d8bde 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java @@ -764,7 +764,7 @@ public abstract class TajoResultSetBase implements ResultSet { return true; } } catch (IOException e) { - throw new SQLException(e.getMessage()); + throw new SQLException(e.getMessage(), e); } return false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 0359685..23ae6dd 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -86,7 +86,7 @@ message GetQueryResultResponse { required string tajoUserName = 3; } -message KillQueryRequest { +message QueryIdRequest { optional SessionIdProto sessionId = 1; required QueryIdProto queryId = 2; } @@ -115,6 +115,12 @@ message GetQueryStatusRequest { required QueryIdProto queryId = 2; } +message SerializedResultSet { + optional SchemaProto schema = 1; + optional int32 bytesNum = 2; + repeated bytes serializedTuples = 3; +} + message SubmitQueryResponse { required ResultCode resultCode = 1; required QueryIdProto queryId = 2; @@ -124,18 +130,14 @@ message SubmitQueryResponse { optional string queryMasterHost = 5; optional int32 queryMasterPort = 6; - message SerializedResultSet { - optional SchemaProto schema = 1; - optional int32 bytesNum = 2; - repeated bytes serializedTuples = 3; - } - optional SerializedResultSet resultSet = 7; optional TableDescProto tableDesc = 8; optional int32 maxRowNum = 9; optional string errorMessage = 10; optional string errorTrace = 11; + + optional KeyValueSetProto sessionVariables = 12; } message GetQueryStatusResponse { @@ -152,6 +154,19 @@ message GetQueryStatusResponse { optional int32 queryMasterPort = 12; } +message GetQueryResultDataRequest { + required SessionIdProto sessionId = 1; + required QueryIdProto queryId = 2; + required int32 fetchRowNum = 3; +} + +message GetQueryResultDataResponse { + required ResultCode resultCode = 1; + required SerializedResultSet resultSet = 2; + optional string errorMessage = 3; + optional string errorTrace = 4; +} + message GetClusterInfoRequest { optional SessionIdProto sessionId = 1; } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/TajoMasterClientProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto index 9495fb1..1afc069 100644 --- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto @@ -42,13 +42,15 @@ service TajoMasterClientProtocolService { rpc submitQuery(QueryRequest) returns (SubmitQueryResponse); rpc updateQuery(QueryRequest) returns (UpdateQueryResponse); rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse); + rpc getQueryResultData(GetQueryResultDataRequest) returns (GetQueryResultDataResponse); // Query And Resource Management APIs rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse); rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse); rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse); - rpc killQuery(KillQueryRequest) returns (BoolProto); + rpc killQuery(QueryIdRequest) returns (BoolProto); rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); + rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto); // Database Management APIs rpc createDatabase(SessionedStringProto) returns (BoolProto); http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index c32fd43..cc875b2 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -115,6 +115,9 @@ public enum SessionVars implements ConfigKey { ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, "If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT), + // ResultSet ---------------------------------------------------------------- + FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT), + //------------------------------------------------------------------------------- // Only for Unit Testing //------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index a089b54..b5a9b50 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -356,7 +356,10 @@ public class TajoConf extends Configuration { $TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1), // Behavior Control --------------------------------------------------------- - $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false); + $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false), + + // ResultSet --------------------------------------------------------- + $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200) ; public final String varname; http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java index abd9e7f..71d930f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java @@ -26,7 +26,10 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.storage.StorageConstants; @@ -40,6 +43,7 @@ public class TPCH extends BenchmarkSet { public static final String LINEITEM = "lineitem"; public static final String CUSTOMER = "customer"; + public static final String CUSTOMER_PARTS = "customer_parts"; public static final String NATION = "nation"; public static final String PART = "part"; public static final String REGION = "region"; @@ -54,6 +58,7 @@ public class TPCH extends BenchmarkSet { static { tableVolumes.put(LINEITEM, 759863287L); tableVolumes.put(CUSTOMER, 24346144L); + tableVolumes.put(CUSTOMER_PARTS, 707L); tableVolumes.put(NATION, 2224L); tableVolumes.put(PART, 24135125L); tableVolumes.put(REGION, 389L); @@ -98,6 +103,16 @@ public class TPCH extends BenchmarkSet { .addColumn("c_comment", Type.TEXT); // 7 schemas.put(CUSTOMER, customer); + Schema customerParts = new Schema() + .addColumn("c_custkey", Type.INT4) // 0 + .addColumn("c_name", Type.TEXT) // 1 + .addColumn("c_address", Type.TEXT) // 2 + .addColumn("c_phone", Type.TEXT) // 3 + .addColumn("c_acctbal", Type.FLOAT8) // 4 + .addColumn("c_mktsegment", Type.TEXT) // 5 + .addColumn("c_comment", Type.TEXT); // 6 + schemas.put(CUSTOMER_PARTS, customerParts); + Schema nation = new Schema() .addColumn("n_nationkey", Type.INT4) // 0 .addColumn("n_name", Type.TEXT) // 1 @@ -177,6 +192,7 @@ public class TPCH extends BenchmarkSet { public void loadTables() throws ServiceException { loadTable(LINEITEM); loadTable(CUSTOMER); + loadTable(CUSTOMER_PARTS); loadTable(NATION); loadTable(PART); loadTable(REGION); @@ -187,12 +203,24 @@ public class TPCH extends BenchmarkSet { } - private void loadTable(String tableName) throws ServiceException { + public void loadTable(String tableName) throws ServiceException { TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); meta.putOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + PartitionMethodDesc partitionMethodDesc = null; + if (tableName.equals(CUSTOMER_PARTS)) { + Schema expressionSchema = new Schema(); + expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4); + partitionMethodDesc = new PartitionMethodDesc( + tajo.getCurrentDatabase(), + CUSTOMER_PARTS, + CatalogProtos.PartitionType.COLUMN, + "c_nationkey", + expressionSchema); + } try { - tajo.createExternalTable(tableName, getSchema(tableName), new Path(dataDir, tableName), meta); + tajo.createExternalTable(tableName, getSchema(tableName), + new Path(dataDir, tableName), meta, partitionMethodDesc); } catch (SQLException s) { throw new ServiceException(s); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java index 3390758..c4cc254 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java @@ -21,21 +21,31 @@ package org.apache.tajo.engine.planner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tajo.algebra.*; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.eval.*; import org.apache.tajo.engine.exception.InvalidQueryException; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.utils.SchemaUtil; import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.TUtil; +import java.io.IOException; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; public class PlannerUtil { @@ -70,24 +80,43 @@ public class PlannerUtil { // one block, without where clause, no group-by, no-sort, no-join boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1; boolean simpleOperator = rootNode.getChild().getType() == NodeType.LIMIT - || rootNode.getChild().getType() == NodeType.SCAN; + || rootNode.getChild().getType() == NodeType.SCAN || rootNode.getChild().getType() == NodeType.PARTITIONS_SCAN; boolean noOrderBy = !plan.getRootBlock().hasNode(NodeType.SORT); boolean noGroupBy = !plan.getRootBlock().hasNode(NodeType.GROUP_BY); boolean noWhere = !plan.getRootBlock().hasNode(NodeType.SELECTION); boolean noJoin = !plan.getRootBlock().hasNode(NodeType.JOIN); - boolean singleRelation = plan.getRootBlock().hasNode(NodeType.SCAN) - && PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; + boolean singleRelation = + (plan.getRootBlock().hasNode(NodeType.SCAN) || plan.getRootBlock().hasNode(NodeType.PARTITIONS_SCAN)) && + PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1; boolean noComplexComputation = false; if (singleRelation) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); - if (!scanNode.getTableDesc().hasPartition() && scanNode.hasTargets() - && scanNode.getTargets().length == scanNode.getInSchema().size()) { + if (scanNode == null) { + scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); + } + if (scanNode.hasTargets()) { + // If the number of columns in the select clause is s different from table schema, + // This query is not a simple query. + if (scanNode.getTableDesc().hasPartition()) { + // In the case of partitioned table, the actual number of columns is ScanNode.InSchema + partitioned columns + int numPartitionColumns = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema().size(); + if (scanNode.getTargets().length != scanNode.getInSchema().size() + numPartitionColumns) { + return false; + } + } else { + if (scanNode.getTargets().length != scanNode.getInSchema().size()) { + return false; + } + } noComplexComputation = true; for (int i = 0; i < scanNode.getTargets().length; i++) { - noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD; + noComplexComputation = + noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD; if (noComplexComputation) { - noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getNamedColumn().equals(scanNode.getInSchema().getColumn(i)); + noComplexComputation = noComplexComputation && + scanNode.getTargets()[i].getNamedColumn().equals( + scanNode.getTableDesc().getLogicalSchema().getColumn(i)); } if (!noComplexComputation) { return noComplexComputation; @@ -97,7 +126,8 @@ public class PlannerUtil { } return !checkIfDDLPlan(rootNode) && - (simpleOperator && noComplexComputation && isOneQueryBlock && noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); + (simpleOperator && noComplexComputation && isOneQueryBlock && + noOrderBy && noGroupBy && noWhere && noJoin && singleRelation); } /** @@ -762,4 +792,96 @@ public class PlannerUtil { return explains.toString(); } + + /** + * Listing table data file which is not empty. + * If the table is a partitioned table, return file list which has same partition key. + * @param tajoConf + * @param tableDesc + * @param fileIndex + * @param numResultFiles + * @return + * @throws IOException + */ + public static FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc, + int fileIndex, int numResultFiles) throws IOException { + FileSystem fs = tableDesc.getPath().getFileSystem(tajoConf); + + List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); + if (fs.exists(tableDesc.getPath())) { + getNonZeroLengthDataFiles(fs, tableDesc.getPath(), nonZeroLengthFiles, fileIndex, numResultFiles, + new AtomicInteger(0)); + } + + List<FileFragment> fragments = new ArrayList<FileFragment>(); + + //In the case of partitioned table, return same partition key data files. + int numPartitionColumns = 0; + if (tableDesc.hasPartition()) { + numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); + } + String[] previousPartitionPathNames = null; + for (FileStatus eachFile: nonZeroLengthFiles) { + FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); + + if (numPartitionColumns > 0) { + // finding partition key; + Path filePath = fileFragment.getPath(); + Path parentPath = filePath; + String[] parentPathNames = new String[numPartitionColumns]; + for (int i = 0; i < numPartitionColumns; i++) { + parentPath = parentPath.getParent(); + parentPathNames[numPartitionColumns - i - 1] = parentPath.getName(); + } + + // If current partitionKey == previousPartitionKey, add to result. + if (previousPartitionPathNames == null) { + fragments.add(fileFragment); + } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { + fragments.add(fileFragment); + } else { + break; + } + previousPartitionPathNames = parentPathNames; + } else { + fragments.add(fileFragment); + } + } + return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{})); + } + + private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, + int startFileIndex, int numResultFiles, + AtomicInteger currentFileIndex) throws IOException { + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path); + if (files != null && files.length > 0) { + for (FileStatus eachFile : files) { + if (result.size() >= numResultFiles) { + return; + } + if (eachFile.isDirectory()) { + getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles, + currentFileIndex); + } else if (eachFile.isFile() && eachFile.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(eachFile); + } + currentFileIndex.incrementAndGet(); + } + } + } + } else { + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus != null && fileStatus.getLen() > 0) { + if (currentFileIndex.get() >= startFileIndex) { + result.add(fileStatus); + } + currentFileIndex.incrementAndGet(); + if (result.size() >= numResultFiles) { + return; + } + } + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 2c62d42..504a792 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -27,10 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.SessionVars; -import org.apache.tajo.TajoConstants; +import org.apache.tajo.*; import org.apache.tajo.algebra.AlterTablespaceSetType; import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.JsonHelper; @@ -39,6 +36,7 @@ import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; @@ -50,6 +48,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.physical.EvalExprExec; +import org.apache.tajo.engine.planner.physical.SeqScanExec; import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; @@ -71,7 +70,7 @@ import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; -import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet; +import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet; public class GlobalEngine extends AbstractService { /** Class Logger */ @@ -217,17 +216,27 @@ public class GlobalEngine extends AbstractService { // Simple query indicates a form of 'select * from tb_name [LIMIT X];'. } else if (PlannerUtil.checkIfSimpleQuery(plan)) { ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN); + if (scanNode == null) { + scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN); + } TableDesc desc = scanNode.getTableDesc(); + int maxRow = Integer.MAX_VALUE; if (plan.getRootBlock().hasNode(NodeType.LIMIT)) { LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT); - responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum()); - } else { - if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) { - responseBuilder.setMaxRowNum(Integer.MAX_VALUE); - } + maxRow = (int) limitNode.getFetchFirstNum(); } + QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId()); + + NonForwardQueryResultScanner queryResultScanner = + new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow); + + queryResultScanner.init(); + session.addNonForwardQueryResultScanner(queryResultScanner); + + responseBuilder.setQueryId(queryId.getProto()); + responseBuilder.setMaxRowNum(maxRow); responseBuilder.setTableDesc(desc.getProto()); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setSessionVariables(session.getProto().getVariables()); responseBuilder.setResultCode(ClientProtos.ResultCode.OK); // NonFromQuery indicates a form of 'select a, x+y;' http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java new file mode 100644 index 0000000..f8c51fd --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import com.google.protobuf.ByteString; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.QueryUnitId; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.PlannerUtil; +import org.apache.tajo.engine.planner.logical.ScanNode; +import org.apache.tajo.engine.planner.physical.SeqScanExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.session.Session; +import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.StorageManagerFactory; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class NonForwardQueryResultScanner { + private static final int MAX_FILE_NUM_PER_SCAN = 100; + + private QueryId queryId; + private String sessionId; + private SeqScanExec scanExec; + private TableDesc tableDesc; + private RowStoreEncoder rowEncoder; + private int maxRow; + private int currentNumRows; + private TaskAttemptContext taskContext; + private TajoConf tajoConf; + private ScanNode scanNode; + + private int currentFileIndex = 0; + + public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId, + QueryId queryId, + ScanNode scanNode, + TableDesc tableDesc, + int maxRow) throws IOException { + this.tajoConf = tajoConf; + this.sessionId = sessionId; + this.queryId = queryId; + this.scanNode = scanNode; + this.tableDesc = tableDesc; + this.maxRow = maxRow; + + this.rowEncoder = RowStoreUtil.createEncoder(tableDesc.getLogicalSchema()); + } + + public void init() throws IOException { + initSeqScanExec(); + } + + private void initSeqScanExec() throws IOException { + FragmentProto[] fragments = PlannerUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc, + currentFileIndex, MAX_FILE_NUM_PER_SCAN); + if (fragments != null && fragments.length > 0) { + this.taskContext = new TaskAttemptContext( + new QueryContext(tajoConf), null, + new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0), + fragments, null); + + try { + // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table. + scanExec = new SeqScanExec(taskContext, + StorageManagerFactory.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments); + } catch (CloneNotSupportedException e) { + throw new IOException(e.getMessage(), e); + } + scanExec.init(); + currentFileIndex += fragments.length; + } + } + + public QueryId getQueryId() { + return queryId; + } + + public String getSessionId() { + return sessionId; + } + + public void setScanExec(SeqScanExec scanExec) { + this.scanExec = scanExec; + } + + public TableDesc getTableDesc() { + return tableDesc; + } + + public void close() throws Exception { + if (scanExec != null) { + scanExec.close(); + scanExec = null; + } + } + + public List<ByteString> getNextRows(int fetchRowNum) throws IOException { + List<ByteString> rows = new ArrayList<ByteString>(); + if (scanExec == null) { + return rows; + } + int rowCount = 0; + + while (true) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + scanExec.close(); + scanExec = null; + + initSeqScanExec(); + if (scanExec != null) { + tuple = scanExec.next(); + } + if (tuple == null) { + if (scanExec != null ) { + scanExec.close(); + scanExec = null; + } + + break; + } + } + rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple)))); + rowCount++; + currentNumRows++; + if (rowCount >= fetchRowNum) { + break; + } + + if (currentNumRows >= maxRow) { + scanExec.close(); + scanExec = null; + break; + } + } + + return rows; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 738b643..65bde27 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -18,6 +18,7 @@ package org.apache.tajo.master; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.commons.lang.exception.ExceptionUtils; @@ -56,6 +57,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.StringUtils; import java.io.IOException; import java.net.InetSocketAddress; @@ -452,6 +454,7 @@ public class TajoMasterClientService extends AbstractService { if (queryInProgress == null) { queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId); } + if (queryInProgress != null) { QueryInfo queryInfo = queryInProgress.getQueryInfo(); builder.setResultCode(ResultCode.OK); @@ -468,14 +471,70 @@ public class TajoMasterClientService extends AbstractService { builder.setFinishTime(System.currentTimeMillis()); } } else { - builder.setResultCode(ResultCode.ERROR); - builder.setErrorMessage("No such query: " + queryId.toString()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + if (session.getNonForwardQueryResultScanner(queryId) != null) { + builder.setResultCode(ResultCode.OK); + builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED); + } else { + builder.setResultCode(ResultCode.ERROR); + builder.setErrorMessage("No such query: " + queryId.toString()); + } } } return builder.build(); } catch (Throwable t) { - throw new ServiceException(t); + throw new ServiceException(t); + } + } + + @Override + public GetQueryResultDataResponse getQueryResultData(RpcController controller, GetQueryResultDataRequest request) + throws ServiceException { + GetQueryResultDataResponse.Builder builder = GetQueryResultDataResponse.newBuilder(); + + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + + QueryId queryId = new QueryId(request.getQueryId()); + NonForwardQueryResultScanner queryResultScanner = session.getNonForwardQueryResultScanner(queryId); + if (queryResultScanner == null) { + throw new ServiceException("No NonForwardQueryResultScanner for " + queryId); + } + + List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum()); + SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder(); + resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto()); + resultSetBuilder.addAllSerializedTuples(rows); + + builder.setResultSet(resultSetBuilder.build()); + builder.setResultCode(ResultCode.OK); + + LOG.info("Send result to client for " + + request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows"); + + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + builder.setResultCode(ResultCode.ERROR); + String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage(); + builder.setErrorMessage(errorMessage); + builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t)); + } + return builder.build(); + } + + @Override + public BoolProto closeNonForwardQuery(RpcController controller, QueryIdRequest request) throws ServiceException { + try { + context.getSessionManager().touch(request.getSessionId().getId()); + Session session = context.getSessionManager().getSession(request.getSessionId().getId()); + QueryId queryId = new QueryId(request.getQueryId()); + + session.closeNonForwardQueryResultScanner(queryId); + return BOOL_TRUE; + } catch (Throwable t) { + throw new ServiceException(t); } } @@ -483,7 +542,7 @@ public class TajoMasterClientService extends AbstractService { * It is invoked by TajoContainerProxy. */ @Override - public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException { + public BoolProto killQuery(RpcController controller, QueryIdRequest request) throws ServiceException { try { context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); @@ -492,7 +551,7 @@ public class TajoMasterClientService extends AbstractService { new QueryInfo(queryId))); return BOOL_TRUE; } catch (Throwable t) { - t.printStackTrace(); + LOG.error(t.getMessage(), t); throw new ServiceException(t); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java index 1f21e2a..5f44ecb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java @@ -19,7 +19,11 @@ package org.apache.tajo.master.session; import com.google.common.collect.ImmutableMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; +import org.apache.tajo.master.NonForwardQueryResultScanner; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.common.ProtoObject; @@ -29,10 +33,13 @@ import java.util.Map; import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto; public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable { + private static final Log LOG = LogFactory.getLog(Session.class); + private final String sessionId; private final String userName; private String currentDatabase; private final Map<String, String> sessionVariables; + private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>(); // transient status private volatile long lastAccessTime; @@ -139,4 +146,51 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>, Clo newSession.sessionVariables.putAll(getAllVariables()); return newSession; } + + public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) { + synchronized (nonForwardQueryMap) { + return nonForwardQueryMap.get(queryId); + } + } + + public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) { + synchronized (nonForwardQueryMap) { + nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner); + } + } + + public void closeNonForwardQueryResultScanner(QueryId queryId) { + NonForwardQueryResultScanner resultScanner; + synchronized (nonForwardQueryMap) { + resultScanner = nonForwardQueryMap.remove(queryId); + } + + if (resultScanner != null) { + try { + resultScanner.close(); + } catch (Exception e) { + LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e); + } + } + } + + public void close() { + try { + synchronized (nonForwardQueryMap) { + for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) { + try { + eachQueryScanner.close(); + } catch (Exception e) { + LOG.error("Error while closing NonForwardQueryResultScanner: " + + eachQueryScanner.getSessionId() + ", " + e.getMessage(), e); + } + } + + nonForwardQueryMap.clear(); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new RuntimeException(t.getMessage(), t); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java index 24df9d8..d701d03 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java @@ -74,12 +74,15 @@ public class SessionManager extends CompositeService implements EventHandler<Ses return sessionId; } - public void removeSession(String sessionId) { + public Session removeSession(String sessionId) { if (sessions.containsKey(sessionId)) { - sessions.remove(sessionId); LOG.info("Session " + sessionId + " is removed."); + Session session = sessions.remove(sessionId); + session.close(); + return session; } else { LOG.error("No such session id: " + sessionId); + return null; } } @@ -132,8 +135,10 @@ public class SessionManager extends CompositeService implements EventHandler<Ses } if (event.getType() == SessionEventType.EXPIRE) { - Session session = sessions.remove(event.getSessionId()); - LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId()); + Session session = removeSession(event.getSessionId()); + if (session != null) { + LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId()); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 422ec2b..96208ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -118,8 +118,6 @@ public class TaskAttemptContext { if (workerContext != null) { this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager(); } else { - // For TestCase - LOG.warn("WorkerContext is null, so create HashShuffleAppenderManager created per a Task."); try { this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index c74a4ec..aec11f6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -573,6 +573,39 @@ public class TajoTestingCluster { LOG.info("Minicluster is down"); } + public static TajoClient newTajoClient() throws Exception { + TpchTestBase instance = TpchTestBase.getInstance(); + TajoTestingCluster util = instance.getTestingCluster(); + while(true) { + if(util.getMaster().isMasterRunning()) { + break; + } + Thread.sleep(1000); + } + TajoConf conf = util.getConfiguration(); + return new TajoClient(conf); + } + + public static ResultSet run(String[] names, + Schema[] schemas, + KeyValueSet tableOption, + String[][] tables, + String query, + TajoClient client) throws Exception { + TajoTestingCluster util = TpchTestBase.getInstance().getTestingCluster(); + + FileSystem fs = util.getDefaultFileSystem(); + Path rootDir = util.getMaster(). + getStorageManager().getWarehouseDir(); + fs.mkdirs(rootDir); + for (int i = 0; i < names.length; i++) { + createTable(names[i], schemas[i], tableOption, tables[i]); + } + Thread.sleep(1000); + ResultSet res = client.executeQueryAndGetResult(query); + return res; + } + public static ResultSet run(String[] names, Schema[] schemas, KeyValueSet tableOption, @@ -588,17 +621,9 @@ public class TajoTestingCluster { } TajoConf conf = util.getConfiguration(); TajoClient client = new TajoClient(conf); + try { - FileSystem fs = util.getDefaultFileSystem(); - Path rootDir = util.getMaster(). - getStorageManager().getWarehouseDir(); - fs.mkdirs(rootDir); - for (int i = 0; i < names.length; i++) { - createTable(names[i], schemas[i], tableOption, tables[i]); - } - Thread.sleep(1000); - ResultSet res = client.executeQueryAndGetResult(query); - return res; + return run(names, schemas, tableOption, tables, query, client); } finally { client.close(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index b370be7..756dadc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -18,11 +18,14 @@ package org.apache.tajo.engine.planner; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.fs.*; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; @@ -34,13 +37,18 @@ import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; @@ -318,4 +326,58 @@ public class TestPlannerUtil { assertTrue(innerComparator.compare(t1, t2) < 0); assertTrue(innerComparator.compare(t2, t1) > 0); } + + @Test + public void testGetNonZeroLengthDataFiles() throws Exception { + String queryFiles = ClassLoader.getSystemResource("queries").toString() + "/TestSelectQuery"; + Path path = new Path(queryFiles); + + TableDesc tableDesc = new TableDesc(); + tableDesc.setName("Test"); + tableDesc.setPath(path); + + FileSystem fs = path.getFileSystem(util.getConfiguration()); + + List<Path> expectedFiles = new ArrayList<Path>(); + RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true); + while (files.hasNext()) { + LocatedFileStatus file = files.next(); + if (file.isFile() && file.getLen() > 0) { + expectedFiles.add(file.getPath()); + } + } + int fileNum = expectedFiles.size() / 5; + + int numResultFiles = 0; + for (int i = 0; i <= 5; i++) { + int start = i * fileNum; + + FragmentProto[] fragments = + PlannerUtil.getNonZeroLengthDataFiles(util.getConfiguration(), tableDesc, start, fileNum); + assertNotNull(fragments); + + numResultFiles += fragments.length; + int expectedSize = fileNum; + if (i == 5) { + //last + expectedSize = expectedFiles.size() - (fileNum * 5); + } + + comparePath(expectedFiles, fragments, start, expectedSize); + } + + assertEquals(expectedFiles.size(), numResultFiles); + } + + private void comparePath(List<Path> expectedFiles, FragmentProto[] fragments, + int startIndex, int expectedSize) throws Exception { + assertEquals(expectedSize, fragments.length); + + int index = 0; + + for (int i = startIndex; i < startIndex + expectedSize; i++, index++) { + FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]); + assertEquals(expectedFiles.get(i), fragment.getPath()); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java index de75ca7..a492d6e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.query; import org.apache.tajo.IntegrationTest; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.TajoClient; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; @@ -165,9 +166,11 @@ public class TestNullValues { @Test public final void testResultSetNullSimpleQuery() throws Exception { String tableName = "nulltable5"; - ResultSet res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName); + ResultSet res = null; + TajoClient client = TajoTestingCluster.newTajoClient(); try { + res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client); int numRows = 0; String expected = @@ -193,7 +196,11 @@ public class TestNullValues { assertEquals(4, numRows); assertEquals(expected, result); } finally { - res.close(); + if (res != null) { + res.close(); + } + + client.close(); } } @@ -207,9 +214,11 @@ public class TestNullValues { "col4 " + "from " + tableName; - ResultSet res = runNullTableQuery(tableName, query); + TajoClient client = TajoTestingCluster.newTajoClient(); + ResultSet res = null; try { + res = runNullTableQuery(tableName, query, client); int numRows = 0; String expected = "null|99999|a|a|1.0|1.0|true\n" + @@ -234,11 +243,15 @@ public class TestNullValues { assertEquals(4, numRows); assertEquals(expected, result); } finally { - res.close(); + if (res != null) { + res.close(); + } + + client.close(); } } - private ResultSet runNullTableQuery(String tableName, String query) throws Exception { + private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception { String [] table = new String[] {tableName}; Schema schema = new Schema(); schema.addColumn("col1", Type.INT4); @@ -256,10 +269,11 @@ public class TestNullValues { tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); - ResultSet res = TajoTestingCluster - .run(table, schemas, tableOptions, new String[][]{data}, query); - - return res; + if (client == null) { + return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query); + } else { + return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client); + } } private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException { http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index 17ce37b..41e2f3e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -81,6 +81,21 @@ public class TestSelectQuery extends QueryTestCaseBase { } @Test + public final void testSimpleQueryWithLimitPartitionedTable() throws Exception { + // select * from customer_parts limit 10; + executeDDL("customer_ddl.sql", null); + for (int i = 0; i < 5; i++) { + executeFile("insert_into_customer.sql").close(); + } + + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + + executeString("DROP TABLE customer_parts PURGE").close(); + } + + @Test public final void testExplainSelect() throws Exception { // explain select l_orderkey, l_partkey from lineitem; ResultSet res = executeQuery(); http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java index 38908db..2c06b88 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.client.TajoClient; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; @@ -139,6 +140,7 @@ public class TestResultSet { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); ResultSet res = null; + TajoClient client = TajoTestingCluster.newTajoClient(); try { String tableName = "datetimetable"; String query = "select col1, col2, col3 from " + tableName; @@ -156,7 +158,7 @@ public class TestResultSet { tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); res = TajoTestingCluster - .run(table, schemas, tableOptions, new String[][]{data}, query); + .run(table, schemas, tableOptions, new String[][]{data}, query, client); assertTrue(res.next()); @@ -212,7 +214,11 @@ public class TestResultSet { } finally { TajoConf.setCurrentTimeZone(tajoCurrentTimeZone); TimeZone.setDefault(systemCurrentTimeZone); - res.close(); + if (res != null) { + res.close(); + } + + client.close(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index b6ac551..a92b751 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -25,6 +25,7 @@ import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; @@ -32,10 +33,7 @@ import org.apache.tajo.engine.eval.BinaryEval; import org.apache.tajo.engine.eval.EvalType; import org.apache.tajo.engine.eval.FieldEval; import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.engine.planner.LogicalOptimizer; -import org.apache.tajo.engine.planner.LogicalPlan; -import org.apache.tajo.engine.planner.LogicalPlanner; -import org.apache.tajo.engine.planner.PlanningException; +import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -55,6 +53,8 @@ import java.util.Map; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class TestGlobalPlanner { private static Log LOG = LogFactory.getLog(TestGlobalPlanner.class); @@ -80,10 +80,10 @@ public class TestGlobalPlanner { // TPC-H Schema for Complex Queries String [] tables = { - "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer" + "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer", "customer_parts" }; int [] volumes = { - 100, 200, 50, 5, 5, 800, 300, 100 + 100, 200, 50, 5, 5, 800, 300, 100, 707 }; tpch = new TPCH(); tpch.loadSchemas(); @@ -96,6 +96,19 @@ public class TestGlobalPlanner { CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, tables[i]), tpch.getSchema(tables[i]), m, CommonTestingUtil.getTestDir()); d.setStats(stats); + + if (tables[i].equals(TPCH.CUSTOMER_PARTS)) { + Schema expressionSchema = new Schema(); + expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4); + PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc( + DEFAULT_DATABASE_NAME, + tables[i], + CatalogProtos.PartitionType.COLUMN, + "c_nationkey", + expressionSchema); + + d.setPartitionMethod(partitionMethodDesc); + } catalog.createTable(d); } @@ -174,10 +187,10 @@ public class TestGlobalPlanner { visitChildExecutionBLock(plan, root, evalMap); // Find required shuffleKey. - Assert.assertTrue(evalMap.get(eval1).booleanValue()); + assertTrue(evalMap.get(eval1).booleanValue()); // Find that ShuffleKeys only includes equi-join conditions - Assert.assertFalse(evalMap.get(eval2).booleanValue()); + assertFalse(evalMap.get(eval2).booleanValue()); } private void visitChildExecutionBLock(MasterPlan plan, ExecutionBlock parentBlock, @@ -290,4 +303,43 @@ public class TestGlobalPlanner { public void testTPCH_Q5() throws Exception { buildPlan(FileUtil.readTextFile(new File("benchmark/tpch/q5.sql"))); } + + @Test + public void testCheckIfSimpleQuery() throws Exception { + MasterPlan plan = buildPlan("select * from customer"); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + //partition table + plan = buildPlan("select * from customer_parts"); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + plan = buildPlan("select * from customer where c_nationkey = 1"); + assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + plan = buildPlan("select * from customer_parts where c_nationkey = 1"); + assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + // same column order + plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" + + " from customer"); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + plan = buildPlan("select c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " + + " from customer_parts"); + assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + // different column order + plan = buildPlan("select c_name, c_custkey, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" + + " from customer"); + assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + plan = buildPlan("select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " + + " from customer_parts"); + assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + + plan = buildPlan("insert into customer_parts " + + " select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " + + " from customer"); + assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan())); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql new file mode 100644 index 0000000..ca43710 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql @@ -0,0 +1,9 @@ +CREATE TABLE customer_parts ( + c_custkey INT4, + c_name TEXT, + c_address TEXT, + c_phone TEXT, + c_acctbal FLOAT8, + c_mktsegment TEXT, + c_comment TEXT +) PARTITION BY COLUMN (c_nationkey INT4); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql new file mode 100644 index 0000000..8767ba4 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql @@ -0,0 +1,11 @@ +INSERT INTO customer_parts + SELECT + c_custkey, + c_name, + c_address, + c_phone, + c_acctbal, + c_mktsegment, + c_comment, + c_nationkey + FROM customer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql new file mode 100644 index 0000000..42362b6 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql @@ -0,0 +1 @@ +select * from customer_parts limit 10; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result new file mode 100644 index 0000000..5704ccb --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result @@ -0,0 +1,12 @@ +c_custkey,c_name,c_address,c_phone,c_acctbal,c_mktsegment,c_comment,c_nationkey +------------------------------- +3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1 +3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1 +3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1 +3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1 +3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1 +2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13 +2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13 +2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13 +2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13 +2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 0b5e28f..e6b12b1 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -34,4 +34,5 @@ Available Session Variables: \set NULL_CHAR [text value] - null char of text file output \set CODEGEN [true or false] - Runtime code generation enabled (experiment) \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs. +\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled \ No newline at end of file
