Repository: tajo Updated Branches: refs/heads/master fc92be782 -> 0f7ff8f01
TAJO-751: JDBC driver should support cancel() method. Closes #605, Closes #459 Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0f7ff8f0 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0f7ff8f0 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0f7ff8f0 Branch: refs/heads/master Commit: 0f7ff8f01170665000ac7d2edc1d28710de69af4 Parents: fc92be7 Author: navis.ryu <[email protected]> Authored: Tue Jun 23 13:48:44 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Tue Jun 23 13:48:44 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/client/QueryClientImpl.java | 12 +- .../apache/tajo/client/SessionConnection.java | 18 +- .../java/org/apache/tajo/client/TajoClient.java | 3 + .../org/apache/tajo/client/TajoClientUtil.java | 21 +- .../org/apache/tajo/jdbc/FetchResultSet.java | 15 +- .../apache/tajo/jdbc/TajoMemoryResultSet.java | 10 +- .../org/apache/tajo/jdbc/TajoResultSetBase.java | 19 +- .../org/apache/tajo/jdbc/WaitingResultSet.java | 71 +++++ .../java/org/apache/tajo/OverridableConf.java | 4 + .../src/main/java/org/apache/tajo/QueryId.java | 4 + .../main/java/org/apache/tajo/SessionVars.java | 4 +- .../java/org/apache/tajo/conf/TajoConf.java | 1 + .../java/org/apache/tajo/util/KeyValueSet.java | 59 +++- .../org/apache/tajo/master/QueryInProgress.java | 10 +- .../tajo/master/TajoMasterClientService.java | 7 +- .../apache/tajo/querymaster/Repartitioner.java | 4 +- .../java/org/apache/tajo/jdbc/TestTajoJdbc.java | 37 ++- .../TestTajoCli/testHelpSessionVars.result | 1 + .../org/apache/tajo/jdbc/JdbcConnection.java | 6 +- .../apache/tajo/jdbc/TajoMetaDataResultSet.java | 9 +- .../apache/tajo/jdbc/TajoPreparedStatement.java | 276 ++----------------- .../org/apache/tajo/jdbc/TajoStatement.java | 199 ++++++++----- 23 files changed, 396 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6c54511..d64022a 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-751: JDBC driver should support cancel() method. + (Contributed by navis, Committed by jihoon) + TAJO-1649: Change Rest API /databases/{database-name}/functions to /functions. (Contributed by DaeMyung Kang, Committed by hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index ac25933..da10f55 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -258,17 +258,7 @@ public class QueryClientImpl implements QueryClient { return createNullResultSet(queryId); } - QueryStatus status = getQueryStatus(queryId); - - while(status != null && !TajoClientUtil.isQueryComplete(status.getState())) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - status = getQueryStatus(queryId); - } + QueryStatus status = TajoClientUtil.waitCompletion(this, queryId); if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { if (status.hasResult()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index ee2d45a..e1a3791 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.annotation.NotNull; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.ipc.ClientProtos; @@ -67,16 +68,16 @@ public class SessionConnection implements Closeable { volatile TajoIdProtos.SessionIdProto sessionId; - private AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); /** session variable cache */ private final Map<String, String> sessionVarsCache = new HashMap<String, String>(); - private ServiceTracker serviceTracker; + private final ServiceTracker serviceTracker; private NettyClientBase client; - private KeyValueSet properties; + private final KeyValueSet properties; /** * Connect to TajoMaster @@ -87,17 +88,16 @@ public class SessionConnection implements Closeable { * @param properties configurations * @throws java.io.IOException */ - public SessionConnection(ServiceTracker tracker, @Nullable String baseDatabase, - KeyValueSet properties) throws IOException { - + public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseDatabase, + @NotNull KeyValueSet properties) throws IOException { + this.serviceTracker = tracker; + this.baseDatabase = baseDatabase; this.properties = properties; this.manager = RpcClientManager.getInstance(); this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES)); this.userInfo = UserRoleInfo.getCurrentUser(); - this.baseDatabase = baseDatabase != null ? baseDatabase : null; - this.serviceTracker = tracker; try { this.client = getTajoMasterConnection(); } catch (ServiceException e) { @@ -125,7 +125,7 @@ public class SessionConnection implements Closeable { } } - protected KeyValueSet getProperties() { + public KeyValueSet getProperties() { return properties; } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 376f63f..85929e8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -19,9 +19,12 @@ package org.apache.tajo.client; import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.util.KeyValueSet; import java.io.Closeable; @ThreadSafe public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable { + + KeyValueSet getProperties(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java index ea15aed..bab699e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -18,7 +18,9 @@ package org.apache.tajo.client; +import com.google.protobuf.ServiceException; import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogUtil; @@ -27,6 +29,8 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; +import org.apache.tajo.jdbc.TajoResultSetBase; +import org.apache.tajo.rpc.RpcUtils; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import java.io.IOException; @@ -56,6 +60,21 @@ public class TajoClientUtil { return !isQueryWaitingForSchedule(state) && !isQueryRunning(state); } + public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws ServiceException { + QueryStatus status = client.getQueryStatus(queryId); + + while(!isQueryComplete(status.getState())) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + status = client.getQueryStatus(queryId); + } + return status; + } + public static ResultSet createResultSet(TajoClient client, QueryId queryId, ClientProtos.GetQueryResultResponse response, int fetchRows) throws IOException { @@ -91,7 +110,7 @@ public class TajoClientUtil { } public static ResultSet createNullResultSet() { - return new TajoMemoryResultSet(null, new Schema(), null, 0, null); + return new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null); } public static ResultSet createNullResultSet(QueryId queryId) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java index efe070e..869d7c4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -27,26 +27,19 @@ import java.io.IOException; import java.sql.SQLException; public class FetchResultSet extends TajoResultSetBase { - private QueryClient tajoClient; - private QueryId queryId; + protected QueryClient tajoClient; private int fetchRowNum; private TajoMemoryResultSet currentResultSet; - private boolean finished = false; -// maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. + private boolean finished; + // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit. private int maxRows; public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) { - super(tajoClient.getClientSideSessionVars()); + super(queryId, schema, tajoClient.getClientSideSessionVars()); this.tajoClient = tajoClient; this.maxRows = tajoClient.getMaxRows(); - this.queryId = queryId; this.fetchRowNum = fetchRowNum; this.totalRow = Integer.MAX_VALUE; - this.schema = schema; - } - - public QueryId getQueryId() { - return queryId; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java index 33cb838..4114d03 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java @@ -31,20 +31,16 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public class TajoMemoryResultSet extends TajoResultSetBase { - private QueryId queryId; private List<ByteString> serializedTuples; private AtomicBoolean closed = new AtomicBoolean(false); private RowStoreUtil.RowStoreDecoder decoder; public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> serializedTuples, int maxRowNum, Map<String, String> clientSideSessionVars) { - super(clientSideSessionVars); - this.queryId = queryId; - this.schema = schema; + super(queryId, schema, clientSideSessionVars); this.totalRow = maxRowNum; this.serializedTuples = serializedTuples; this.decoder = RowStoreUtil.createDecoder(schema); - init(); } @Override @@ -53,10 +49,6 @@ public class TajoMemoryResultSet extends TajoResultSetBase { curRow = 0; } - public QueryId getQueryId() { - return queryId; - } - @Override public synchronized void close() throws SQLException { if (closed.getAndSet(true)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java index ed06cf3..3d8d9aa 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java @@ -18,6 +18,7 @@ package org.apache.tajo.jdbc; +import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; @@ -47,7 +48,11 @@ public abstract class TajoResultSetBase implements ResultSet { protected Schema schema; protected Tuple cur; - public TajoResultSetBase(@Nullable Map<String, String> clientSideSessionVars) { + protected final QueryId queryId; + + public TajoResultSetBase(QueryId queryId, Schema schema, @Nullable Map<String, String> clientSideSessionVars) { + this.queryId = queryId; + this.schema = schema; this.clientSideSessionVars = clientSideSessionVars; if (clientSideSessionVars != null) { @@ -73,6 +78,14 @@ public abstract class TajoResultSetBase implements ResultSet { return wasNull = tuple.isBlankOrNull(index); } + protected Schema getSchema() throws SQLException { + return schema; + } + + public QueryId getQueryId() { + return queryId; + } + public Tuple getCurrentTuple() { return cur; } @@ -395,7 +408,7 @@ public abstract class TajoResultSetBase implements ResultSet { @Override public int findColumn(String colName) throws SQLException { - return schema.getColumnIdByName(colName); + return getSchema().getColumnIdByName(colName); } @Override @@ -511,7 +524,7 @@ public abstract class TajoResultSetBase implements ResultSet { @Override public ResultSetMetaData getMetaData() throws SQLException { - return new TajoResultSetMetaData(schema); + return new TajoResultSetMetaData(getSchema()); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java new file mode 100644 index 0000000..b9f8df5 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.jdbc; + +import com.google.protobuf.ServiceException; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.QueryClient; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.ipc.ClientProtos; + +import java.sql.SQLException; + +/** + * Blocks on schema retrieval if it's not ready + */ +public class WaitingResultSet extends FetchResultSet { + + public WaitingResultSet(QueryClient tajoClient, QueryId queryId, int fetchRowNum) + throws SQLException { + super(tajoClient, null, queryId, fetchRowNum); + } + + @Override + public boolean next() throws SQLException { + getSchema(); + return super.next(); + } + + @Override + protected Schema getSchema() throws SQLException { + return schema == null ? schema = waitOnResult() : schema; + } + + private Schema waitOnResult() throws SQLException { + try { + QueryStatus status = TajoClientUtil.waitCompletion(tajoClient, queryId); + + if (status.getState() != TajoProtos.QueryState.QUERY_SUCCEEDED) { + throw new ServiceException(status.getErrorMessage() != null ? status.getErrorMessage() : + status.getErrorTrace() != null ? status.getErrorTrace() : + "Failed to execute query by unknown reason"); + } + ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(queryId); + TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc()); + return tableDesc.getLogicalSchema(); + } catch (ServiceException e) { + throw new SQLException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java index 61bbb5a..c22f054 100644 --- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java @@ -98,6 +98,7 @@ public class OverridableConf extends KeyValueSet { } } + @Override public boolean getBool(ConfigKey key) { return getBool(key, null); } @@ -123,6 +124,7 @@ public class OverridableConf extends KeyValueSet { } } + @Override public int getInt(ConfigKey key) { return getInt(key, null); } @@ -148,6 +150,7 @@ public class OverridableConf extends KeyValueSet { } } + @Override public long getLong(ConfigKey key) { return getLong(key, null); } @@ -173,6 +176,7 @@ public class OverridableConf extends KeyValueSet { } } + @Override public float getFloat(ConfigKey key) { return getLong(key, null); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-common/src/main/java/org/apache/tajo/QueryId.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java index 85882c1..35ca75c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java +++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java @@ -44,6 +44,10 @@ public class QueryId implements Comparable<QueryId> { return seq; } + public boolean isNull() { + return this.equals(QueryIdFactory.NULL_QUERY_ID); + } + @Override public String toString() { return QUERY_ID_PREFIX + SEPARATOR + toStringNoPrefix(); http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-common/src/main/java/org/apache/tajo/SessionVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 031387c..98c2f3e 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -102,7 +102,7 @@ public enum SessionVars implements ConfigKey { GROUPBY_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME, "shuffle output size for sort (mb)", DEFAULT, Integer.class, Validators.min("1")), TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME, - "shuffle output size for partition table write (mb)", DEFAULT, Long.class, Validators.min("1")), + "shuffle output size for partition table write (mb)", DEFAULT, Integer.class, Validators.min("1")), GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT, Boolean.class, Validators.bool()), @@ -133,6 +133,8 @@ public enum SessionVars implements ConfigKey { // ResultSet ---------------------------------------------------------------- FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT, Integer.class, Validators.min("0")), + BLOCK_ON_RESULT(ConfVars.$RESULT_SET_BLOCK_WAIT, "Whether to block result set on query execution", DEFAULT, + Boolean.class, Validators.bool()), //------------------------------------------------------------------------------- // Only for Unit Testing http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 3f350c3..ba777c1 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -378,6 +378,7 @@ public class TajoConf extends Configuration { // ResultSet --------------------------------------------------------- $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200), + $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true), ; public final String varname; http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java index 5dba9e2..0e27769 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java @@ -21,7 +21,10 @@ package org.apache.tajo.util; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.gson.annotations.Expose; +import org.apache.tajo.ConfigKey; +import org.apache.tajo.SessionVars; import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.json.CommonGsonHelper; import org.apache.tajo.json.GsonObject; @@ -37,7 +40,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs public static final String FALSE_STR = "false"; @Expose private Map<String,String> keyVals; - + public KeyValueSet() { keyVals = TUtil.newHashMap(); } @@ -46,23 +49,23 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs this(); putAll(keyVals); } - + public KeyValueSet(KeyValueSetProto proto) { this.keyVals = TUtil.newHashMap(); for(KeyValueProto keyval : proto.getKeyvalList()) { this.keyVals.put(keyval.getKey(), keyval.getValue()); } } - + public KeyValueSet(KeyValueSet keyValueSet) { this(); this.keyVals.putAll(keyValueSet.keyVals); } - + public static KeyValueSet create() { return new KeyValueSet(); } - + public static KeyValueSet create(KeyValueSet keyValueSet) { return new KeyValueSet(keyValueSet); } @@ -119,7 +122,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs public boolean getBool(String key, Boolean defaultVal) { if (containsKey(key)) { String strVal = get(key, null); - return strVal != null ? strVal.equalsIgnoreCase(TRUE_STR) : false; + return strVal != null && strVal.equalsIgnoreCase(TRUE_STR); } else if (defaultVal != null) { return defaultVal; } else { @@ -131,6 +134,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs return getBool(key, null); } + public boolean getBool(ConfigKey key) { + String keyName = key.keyname(); + if (key instanceof SessionVars) { + return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal); + } else if (key instanceof TajoConf.ConfVars) { + return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal); + } + return getBool(keyName); + } + public void setInt(String key, int val) { set(key, String.valueOf(val)); } @@ -150,6 +163,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs return getInt(key, null); } + public int getInt(ConfigKey key) { + String keyName = key.keyname(); + if (key instanceof SessionVars) { + return getInt(keyName, ((SessionVars) key).getConfVars().defaultIntVal); + } else if (key instanceof TajoConf.ConfVars) { + return getInt(keyName, ((TajoConf.ConfVars) key).defaultIntVal); + } + return getInt(keyName); + } + public void setLong(String key, long val) { set(key, String.valueOf(val)); } @@ -169,6 +192,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs return getLong(key, null); } + public long getLong(ConfigKey key) { + String keyName = key.keyname(); + if (key instanceof SessionVars) { + return getLong(keyName, ((SessionVars) key).getConfVars().defaultLongVal); + } else if (key instanceof TajoConf.ConfVars) { + return getLong(keyName, ((TajoConf.ConfVars) key).defaultLongVal); + } + return getLong(keyName); + } + public void setFloat(String key, float val) { set(key, String.valueOf(val)); } @@ -185,7 +218,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs throw new IllegalArgumentException("No such a config key: " + key); } } else if (defaultVal != null) { - return defaultVal.floatValue(); + return defaultVal; } else { throw new IllegalArgumentException("No such a config key: " + key); } @@ -194,7 +227,17 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs public float getFloat(String key) { return getFloat(key, null); } - + + public float getFloat(ConfigKey key) { + String keyName = key.keyname(); + if (key instanceof SessionVars) { + return getFloat(keyName, ((SessionVars) key).getConfVars().defaultFloatVal); + } else if (key instanceof TajoConf.ConfVars) { + return getFloat(keyName, ((TajoConf.ConfVars) key).defaultFloatVal); + } + return getFloat(keyName); + } + public String remove(String key) { return keyVals.remove(key); } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 6a074a2..ece42f7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -247,7 +247,7 @@ public class QueryInProgress { // terminal state will let client to retrieve a query result // So, we must set the query result before changing query state - if (isFinishState(this.queryInfo.getQueryState())) { + if (isFinishState()) { if (queryInfo.hasResultdesc()) { this.queryInfo.setResultDesc(queryInfo.getResultDesc()); } @@ -260,7 +260,13 @@ public class QueryInProgress { } } - private boolean isFinishState(TajoProtos.QueryState state) { + public boolean isKillWait() { + TajoProtos.QueryState state = queryInfo.getQueryState(); + return state == TajoProtos.QueryState.QUERY_KILL_WAIT; + } + + public boolean isFinishState() { + TajoProtos.QueryState state = queryInfo.getQueryState(); return state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR || state == TajoProtos.QueryState.QUERY_KILLED || http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 2602d7d..31eecdc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -54,7 +54,6 @@ import org.apache.tajo.plan.logical.PartitionedTableScanNode; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.rpc.BlockingRpcServer; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto; @@ -619,6 +618,12 @@ public class TajoMasterClientService extends AbstractService { try { context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); + + QueryInProgress progress = context.getQueryJobManager().getQueryInProgress(queryId); + if (progress == null || progress.isFinishState() || progress.isKillWait()) { + return BOOL_TRUE; + } + QueryManager queryManager = context.getQueryJobManager(); queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL, new QueryInfo(queryId))); http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 4fe150b..ec09145 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -962,8 +962,8 @@ public class Repartitioner { public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates, String tableName) { - long splitVolume = StorageUnit.MB * - stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); + long splitVolume = (long)StorageUnit.MB * + stage.getMasterPlan().getContext().getInt(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); long pageSize = ((long)StorageUnit.MB) * stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes if (pageSize >= splitVolume) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index c8c24cd..ad74046 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -19,13 +19,12 @@ package org.apache.tajo.jdbc; import com.google.common.collect.Maps; -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryClient; +import org.apache.tajo.client.QueryStatus; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -33,10 +32,7 @@ import org.junit.experimental.categories.Category; import java.net.InetSocketAddress; import java.sql.*; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; @@ -673,4 +669,31 @@ public class TestTajoJdbc extends QueryTestCaseBase { } } } + + @Test + public final void testCancel() throws Exception { + String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), + DEFAULT_DATABASE_NAME); + Properties props = new Properties(); + props.setProperty(SessionVars.BLOCK_ON_RESULT.keyname(), "false"); + + Connection conn = new JdbcConnection(connUri, props); + PreparedStatement statement = conn.prepareStatement("select sleep(1) from lineitem"); + try { + assertTrue("should have result set", statement.execute()); + TajoResultSetBase result = (TajoResultSetBase) statement.getResultSet(); + Thread.sleep(1000); // todo query master is not killed properly if it's compiling the query (use 100, if you want see) + statement.cancel(); + + QueryStatus status = client.getQueryStatus(result.getQueryId()); + assertEquals(TajoProtos.QueryState.QUERY_KILLED, status.getState()); + } finally { + if (statement != null) { + statement.close(); + } + if (conn != null) { + conn.close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index 7e741a9..137b0de 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -37,4 +37,5 @@ Available Session Variables: \set CODEGEN [true or false] - Runtime code generation enabled (experiment) \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs. \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master +\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index c5d4868..d575968 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -21,11 +21,13 @@ package org.apache.tajo.jdbc; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.client.CatalogAdminClient; import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.jdbc.util.QueryStringDecoder; import org.apache.tajo.rpc.RpcUtils; import org.apache.tajo.util.KeyValueSet; @@ -55,6 +57,8 @@ public class JdbcConnection implements Connection { /** it will be used soon. */ private final Map<String, List<String>> params; + private final KeyValueSet clientProperties; + public JdbcConnection(String rawURI, Properties properties) throws SQLException { this.rawURI = rawURI; this.properties = properties; @@ -101,7 +105,7 @@ public class JdbcConnection implements Connection { throw new SQLException("Invalid JDBC URI: " + rawURI, "TAJO-001"); } - KeyValueSet clientProperties = new KeyValueSet(); + clientProperties = new KeyValueSet(); if(properties != null) { for(Map.Entry<Object, Object> entry: properties.entrySet()) { clientProperties.set(entry.getKey().toString(), entry.getValue().toString()); http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java index eb3595f..9fba40a 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java @@ -30,17 +30,12 @@ public class TajoMetaDataResultSet extends TajoResultSetBase { private List<MetaDataTuple> values; public TajoMetaDataResultSet(Schema schema, List<MetaDataTuple> values) { - super(null); - init(); - this.schema = schema; + super(null, schema, null); setDataTuples(values); } public TajoMetaDataResultSet(List<String> columns, List<Type> types, List<MetaDataTuple> values) { - super(null); - init(); - schema = new Schema(); - + super(null, new Schema(), null); int index = 0; if(columns != null) { for(String columnName: columns) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java index 229587a..0574bf9 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java @@ -30,35 +30,16 @@ import java.util.HashMap; * TajoPreparedStatement. * */ -public class TajoPreparedStatement implements PreparedStatement { - private JdbcConnection conn; +public class TajoPreparedStatement extends TajoStatement implements PreparedStatement { + private final String sql; - private TajoClient tajoClient; + /** * save the SQL parameters {paramLoc:paramValue} */ private final HashMap<Integer, String> parameters=new HashMap<Integer, String>(); /** - * We need to keep a reference to the result set to support the following: - * <code> - * statement.execute(String sql); - * statement.getResultSet(); - * </code>. - */ - private ResultSet resultSet = null; - - /** - * Add SQLWarnings to the warningChain if needed. - */ - //private SQLWarning warningChain = null; - - /** - * Keep state so we can fail certain calls made after close(). - */ - private boolean isClosed = false; - - /** * keep the current ResultRet update count */ private int updateCount = 0; @@ -69,8 +50,7 @@ public class TajoPreparedStatement implements PreparedStatement { public TajoPreparedStatement(JdbcConnection conn, TajoClient tajoClient, String sql) { - this.conn = conn; - this.tajoClient = tajoClient; + super(conn, tajoClient); this.sql = sql; } @@ -81,6 +61,7 @@ public class TajoPreparedStatement implements PreparedStatement { @Override public void clearParameters() throws SQLException { + checkConnection("Can't clear parameters"); this.parameters.clear(); } @@ -101,22 +82,14 @@ public class TajoPreparedStatement implements PreparedStatement { return updateCount; } - protected ResultSet executeImmediate(String sql) throws SQLException { - if (isClosed) { - throw new SQLException("Can't execute after statement has been closed"); - } + protected TajoResultSetBase executeImmediate(String sql) throws SQLException { + checkConnection("Can't execute"); try { if (sql.contains("?")) { sql = updateSql(sql, parameters); } - if (TajoStatement.isSetVariableQuery(sql)) { - return TajoStatement.setSessionVariable(tajoClient, sql); - } else if (TajoStatement.isUnSetVariableQuery(sql)) { - return TajoStatement.unSetSessionVariable(tajoClient, sql); - } else { - return tajoClient.executeQueryAndGetResult(sql); - } + return (TajoResultSetBase) executeSQL(sql); } catch (Exception e) { throw new SQLException(e.getMessage(), e); } @@ -131,7 +104,7 @@ public class TajoPreparedStatement implements PreparedStatement { */ private String updateSql(final String sql, HashMap<Integer, String> parameters) { - StringBuffer newSql = new StringBuffer(sql); + StringBuilder newSql = new StringBuilder(sql); int paramLoc = 1; while (getCharIndexFromSqlByParamLocation(sql, '?', paramLoc) > 0) { @@ -179,6 +152,7 @@ public class TajoPreparedStatement implements PreparedStatement { @Override public ResultSetMetaData getMetaData() throws SQLException { + checkConnection("Can't get metadata"); if(resultSet != null) { return resultSet.getMetaData(); } else { @@ -249,6 +223,7 @@ public class TajoPreparedStatement implements PreparedStatement { @Override public void setBoolean(int parameterIndex, boolean x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex, "" + x); } @@ -306,21 +281,25 @@ public class TajoPreparedStatement implements PreparedStatement { @Override public void setDouble(int parameterIndex, double x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex,"" + x); } @Override public void setFloat(int parameterIndex, float x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex,"" + x); } @Override public void setInt(int parameterIndex, int x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex,"" + x); } @Override public void setLong(int parameterIndex, long x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex,"" + x); } @@ -399,11 +378,13 @@ public class TajoPreparedStatement implements PreparedStatement { @Override public void setShort(int parameterIndex, short x) throws SQLException { + checkConnection("Can't set parameters"); this.parameters.put(parameterIndex,"" + x); } @Override public void setString(int parameterIndex, String x) throws SQLException { + checkConnection("Can't set parameters"); x=x.replace("'", "\\'"); this.parameters.put(parameterIndex,"'" + x +"'"); } @@ -439,227 +420,4 @@ public class TajoPreparedStatement implements PreparedStatement { throws SQLException { throw new SQLFeatureNotSupportedException("setUnicodeStream not supported"); } - - @Override - public void addBatch(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("addBatch not supported"); - } - - @Override - public void cancel() throws SQLException { - throw new SQLFeatureNotSupportedException("cancel not supported"); - } - - @Override - public void clearBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("clearBatch not supported"); - } - - @Override - public void clearWarnings() throws SQLException { - } - - public void closeOnCompletion() throws SQLException { - // JDK 1.7 - throw new SQLFeatureNotSupportedException("closeOnCompletion"); - } - - @Override - public void close() throws SQLException { - if (resultSet!=null) { - resultSet.close(); - resultSet = null; - } - isClosed = true; - } - - @Override - public boolean execute(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("execute(sql) not supported"); - } - - @Override - public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { - throw new SQLFeatureNotSupportedException("execute(sql) not supported"); - } - - @Override - public boolean execute(String sql, int[] columnIndexes) throws SQLException { - throw new SQLFeatureNotSupportedException("execute(sql) not supported"); - } - - @Override - public boolean execute(String sql, String[] columnNames) throws SQLException { - throw new SQLFeatureNotSupportedException("execute(sql) not supported"); - } - - @Override - public int[] executeBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("executeBatch not supported"); - } - - @Override - public ResultSet executeQuery(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("executeQuery(sql) not supported"); - } - - @Override - public int executeUpdate(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); - } - - @Override - public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); - } - - @Override - public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); - } - - @Override - public int executeUpdate(String sql, String[] columnNames) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); - } - - @Override - public Connection getConnection() throws SQLException { - return conn; - } - - @Override - public int getFetchDirection() throws SQLException { - throw new SQLFeatureNotSupportedException("getFetchDirection not supported"); - } - - @Override - public int getFetchSize() throws SQLException { - throw new SQLFeatureNotSupportedException("getFetchSize not supported"); - } - - @Override - public ResultSet getGeneratedKeys() throws SQLException { - throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported"); - } - - @Override - public int getMaxFieldSize() throws SQLException { - throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported"); - } - - @Override - public int getMaxRows() throws SQLException { - throw new SQLFeatureNotSupportedException("getMaxRows not supported"); - } - - @Override - public boolean getMoreResults() throws SQLException { - throw new SQLFeatureNotSupportedException("getMoreResults not supported"); - } - - @Override - public boolean getMoreResults(int current) throws SQLException { - throw new SQLFeatureNotSupportedException("getMoreResults not supported"); - } - - @Override - public int getQueryTimeout() throws SQLException { - throw new SQLFeatureNotSupportedException("getQueryTimeout not supported"); - } - - @Override - public ResultSet getResultSet() throws SQLException { - return this.resultSet; - } - - @Override - public int getResultSetConcurrency() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported"); - } - - @Override - public int getResultSetHoldability() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported"); - } - - @Override - public int getResultSetType() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetType not supported"); - } - - @Override - public int getUpdateCount() throws SQLException { - return updateCount; - } - - @Override - public SQLWarning getWarnings() throws SQLException { - return null; - } - - @Override - public boolean isClosed() throws SQLException { - return isClosed; - } - - public boolean isCloseOnCompletion() throws SQLException { - //JDK 1.7 - throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported"); - } - - @Override - public boolean isPoolable() throws SQLException { - throw new SQLFeatureNotSupportedException("isPoolable not supported"); - } - - @Override - public void setCursorName(String name) throws SQLException { - throw new SQLFeatureNotSupportedException("setCursorName not supported"); - } - - @Override - public void setEscapeProcessing(boolean enable) throws SQLException { - throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported"); - } - - @Override - public void setFetchDirection(int direction) throws SQLException { - throw new SQLFeatureNotSupportedException("setFetchDirection not supported"); - } - - @Override - public void setFetchSize(int rows) throws SQLException { - throw new SQLFeatureNotSupportedException("setFetchSize not supported"); - } - - @Override - public void setMaxFieldSize(int max) throws SQLException { - throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported"); - } - - @Override - public void setMaxRows(int max) throws SQLException { - throw new SQLFeatureNotSupportedException("setMaxRows not supported"); - } - - @Override - public void setPoolable(boolean poolable) throws SQLException { - throw new SQLFeatureNotSupportedException("setPoolable not supported"); - } - - @Override - public void setQueryTimeout(int seconds) throws SQLException { - throw new SQLFeatureNotSupportedException("setQueryTimeout not supported"); - } - - @Override - public boolean isWrapperFor(Class<?> iface) throws SQLException { - throw new SQLFeatureNotSupportedException("isWrapperFor not supported"); - } - - @Override - public <T> T unwrap(Class<T> iface) throws SQLException { - throw new SQLFeatureNotSupportedException("unwrap not supported"); - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0f7ff8f0/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index 820e350..0f80ddf 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@ -19,17 +19,21 @@ package org.apache.tajo.jdbc; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; +import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientUtil; +import org.apache.tajo.ipc.ClientProtos; +import java.io.IOException; import java.sql.*; import java.util.HashMap; import java.util.Map; public class TajoStatement implements Statement { - private JdbcConnection conn; - private TajoClient tajoClient; - private int fetchSize = 200; + protected JdbcConnection conn; + protected TajoClient tajoClient; + protected int fetchSize = SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal; /** * We need to keep a reference to the result set to support the following: @@ -38,35 +42,65 @@ public class TajoStatement implements Statement { * statement.getResultSet(); * </code>. */ - private ResultSet resultSet = null; + protected TajoResultSetBase resultSet = null; + + /** + * Add SQLWarnings to the warningChain if needed. + */ + protected SQLWarning warningChain = null; /** * Keep state so we can fail certain calls made after close(). */ - private boolean isClosed = false; + private boolean isClosed; + + private boolean blockWait; public TajoStatement(JdbcConnection conn, TajoClient tajoClient) { this.conn = conn; this.tajoClient = tajoClient; + this.blockWait = tajoClient.getProperties().getBool(SessionVars.BLOCK_ON_RESULT); } + /* + * NOTICE + * + * For unimplemented methods, this class throws an exception or prints an error message. + * If the unimplemented method can cause unexpected result to user application when it is called, + * it should throw an exception. + * Otherwise, it is enough that prints an error message. + */ + @Override public void addBatch(String sql) throws SQLException { - throw new SQLFeatureNotSupportedException("addBatch not supported"); + throw new SQLFeatureNotSupportedException("addBatch() is not supported yet."); } @Override public void cancel() throws SQLException { - throw new SQLFeatureNotSupportedException("cancel not supported"); + checkConnection("Can't cancel query"); + if (resultSet == null || resultSet.getQueryId().isNull()) { + return; + } + try { + tajoClient.killQuery(resultSet.getQueryId()); + } catch (Exception e) { + throw new SQLException(e); + } finally { + resultSet = null; + } } @Override public void clearBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("clearBatch not supported"); + throw new SQLFeatureNotSupportedException("clearBatch() is not supported yet."); } @Override - public void clearWarnings() throws SQLException {} + public void clearWarnings() throws SQLException { + checkConnection("Can't clear warnings"); + warningChain = null; + } @Override public void close() throws SQLException { @@ -79,55 +113,87 @@ public class TajoStatement implements Statement { public void closeOnCompletion() throws SQLException { // JDK 1.7 - throw new SQLFeatureNotSupportedException("closeOnCompletion not supported"); + throw new SQLFeatureNotSupportedException("closeOnCompletion() is not supported yet."); } @Override public boolean execute(String sql) throws SQLException { - resultSet = executeQuery(sql); + resultSet = (TajoResultSetBase) executeQuery(sql); return resultSet != null; } @Override public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { - throw new SQLFeatureNotSupportedException("execute not supported"); + throw new SQLFeatureNotSupportedException("execute() is not supported yet."); } @Override public boolean execute(String sql, int[] columnIndexes) throws SQLException { - throw new SQLFeatureNotSupportedException("execute not supported"); + throw new SQLFeatureNotSupportedException("execute() is not supported yet."); } @Override public boolean execute(String sql, String[] columnNames) throws SQLException { - throw new SQLFeatureNotSupportedException("execute not supported"); + throw new SQLFeatureNotSupportedException("execute() is not supported yet."); } @Override public int[] executeBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("executeBatch not supported"); + throw new SQLFeatureNotSupportedException("executeBatch() is not supported yet."); } @Override public ResultSet executeQuery(String sql) throws SQLException { - if (isClosed) { - throw new SQLException("Can't execute after statement has been closed"); - } + checkConnection("Can't execute"); try { - if (isSetVariableQuery(sql)) { - return setSessionVariable(tajoClient, sql); - } else if (isUnSetVariableQuery(sql)) { - return unSetSessionVariable(tajoClient, sql); - } else { - return tajoClient.executeQueryAndGetResult(sql); - } + return executeSQL(sql); } catch (Exception e) { throw new SQLException(e.getMessage(), e); } } + protected ResultSet executeSQL(String sql) throws SQLException, ServiceException, IOException { + if (isSetVariableQuery(sql)) { + return setSessionVariable(tajoClient, sql); + } + if (isUnSetVariableQuery(sql)) { + return unSetSessionVariable(tajoClient, sql); + } + + ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql); + if (response.getResultCode() == ClientProtos.ResultCode.ERROR) { + if (response.hasErrorMessage()) { + throw new ServiceException(response.getErrorMessage()); + } + if (response.hasErrorTrace()) { + throw new ServiceException(response.getErrorTrace()); + } + throw new ServiceException("Failed to submit query by unknown reason"); + } + + QueryId queryId = new QueryId(response.getQueryId()); + if (response.getIsForwarded() && !queryId.isNull()) { + WaitingResultSet result = new WaitingResultSet(tajoClient, queryId, fetchSize); + if (blockWait) { + result.getSchema(); + } + return result; + } + + if (response.hasResultSet() || response.hasTableDesc()) { + return TajoClientUtil.createResultSet(tajoClient, response, fetchSize); + } + return TajoClientUtil.createNullResultSet(queryId); + } + + protected void checkConnection(String errorMsg) throws SQLException { + if (isClosed) { + throw new SQLException(errorMsg + " after statement has been closed"); + } + } + public static boolean isSetVariableQuery(String sql) { if (sql == null || sql.trim().isEmpty()) { return false; @@ -144,7 +210,7 @@ public class TajoStatement implements Statement { return sql.trim().toLowerCase().startsWith("unset"); } - public static ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException { + private ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException { int index = sql.toLowerCase().indexOf("set"); if (index < 0) { throw new SQLException("SET statement should be started 'SET' keyword: " + sql); @@ -165,7 +231,7 @@ public class TajoStatement implements Statement { return TajoClientUtil.createNullResultSet(); } - public static ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException { + private ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException { int index = sql.toLowerCase().indexOf("unset"); if (index < 0) { throw new SQLException("UNSET statement should be started 'UNSET' keyword: " + sql); @@ -186,57 +252,57 @@ public class TajoStatement implements Statement { @Override public int executeUpdate(String sql) throws SQLException { + checkConnection("Can't execute update"); try { tajoClient.executeQuery(sql); return 1; } catch (Exception ex) { - throw new SQLFeatureNotSupportedException(ex.toString()); + throw new SQLException(ex.toString()); } } @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); + throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet."); } @Override public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); + throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet."); } @Override public int executeUpdate(String sql, String[] columnNames) throws SQLException { - throw new SQLFeatureNotSupportedException("executeUpdate not supported"); + throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet."); } @Override public Connection getConnection() throws SQLException { - if (isClosed) - throw new SQLException("Can't get connection after statement has been closed"); + checkConnection("Can't get connection"); return conn; } @Override public int getFetchDirection() throws SQLException { - throw new SQLFeatureNotSupportedException("getFetchDirection not supported"); + checkConnection("Can't get fetch direction"); + return ResultSet.FETCH_FORWARD; } @Override public int getFetchSize() throws SQLException { - if (isClosed) - throw new SQLException("Can't get fetch size after statement has been closed"); + checkConnection("Can't get fetch size"); return fetchSize; } @Override public ResultSet getGeneratedKeys() throws SQLException { - throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported"); + throw new SQLFeatureNotSupportedException("getGeneratedKeys() is not supported yet."); } @Override public int getMaxFieldSize() throws SQLException { - throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported"); + throw new SQLFeatureNotSupportedException("getMaxFieldSize() is not supported yet."); } @Override @@ -246,53 +312,54 @@ public class TajoStatement implements Statement { @Override public boolean getMoreResults() throws SQLException { - throw new SQLFeatureNotSupportedException("getMoreResults not supported"); + throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet."); } @Override public boolean getMoreResults(int current) throws SQLException { - throw new SQLFeatureNotSupportedException("getMoreResults not supported"); + throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet."); } @Override public int getQueryTimeout() throws SQLException { - throw new SQLFeatureNotSupportedException("getQueryTimeout not supported"); + System.err.println("getResultSetConcurrency() is not supported yet."); + return -1; } @Override public ResultSet getResultSet() throws SQLException { - if (isClosed) - throw new SQLException("Can't get result set after statement has been closed"); + checkConnection("Can't get result set"); return resultSet; } @Override public int getResultSetConcurrency() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported"); + System.err.println("getResultSetConcurrency() is not supported yet."); + return -1; } @Override public int getResultSetHoldability() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported"); + System.err.println("getResultSetHoldability() is not supported yet."); + return -1; } @Override public int getResultSetType() throws SQLException { - throw new SQLFeatureNotSupportedException("getResultSetType not supported"); + System.err.println("getResultSetType() is not supported yet."); + return -1; } @Override public int getUpdateCount() throws SQLException { - if (isClosed) - throw new SQLException("Can't get update count after statement has been closed"); - return 0; + System.err.println("getResultSetType() is not supported yet."); + return -1; } @Override public SQLWarning getWarnings() throws SQLException { - if (isClosed) - throw new SQLException("Can't get warnings after statement has been closed"); - return null; + checkConnection("Can't get warnings"); + return warningChain; } @Override @@ -302,40 +369,41 @@ public class TajoStatement implements Statement { public boolean isCloseOnCompletion() throws SQLException { // JDK 1.7 - throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported"); + throw new SQLFeatureNotSupportedException("isCloseOnCompletion() is not supported yet."); } @Override public boolean isPoolable() throws SQLException { - throw new SQLFeatureNotSupportedException("isPoolable not supported"); + throw new SQLFeatureNotSupportedException("isPoolable() is not supported yet."); } @Override public void setCursorName(String name) throws SQLException { - throw new SQLFeatureNotSupportedException("setCursorName not supported"); + System.err.println("setCursorName() is not supported yet."); } /** * Not necessary. */ @Override - public void setEscapeProcessing(boolean enable) throws SQLException {} + public void setEscapeProcessing(boolean enable) throws SQLException { + System.err.println("setEscapeProcessing() is not supported yet."); + } @Override public void setFetchDirection(int direction) throws SQLException { - throw new SQLFeatureNotSupportedException("setFetchDirection not supported"); + System.err.println("setFetchDirection() is not supported yet."); } @Override public void setFetchSize(int rows) throws SQLException { - if (isClosed) - throw new SQLException("Can't set fetch size after statement has been closed"); + checkConnection("Can't set fetch size"); fetchSize = rows; } @Override public void setMaxFieldSize(int max) throws SQLException { - throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported"); + System.err.println("setMaxFieldSize() is not supported yet."); } @Override @@ -348,22 +416,23 @@ public class TajoStatement implements Statement { @Override public void setPoolable(boolean poolable) throws SQLException { - throw new SQLFeatureNotSupportedException("setPoolable not supported"); + System.err.println("setPoolable() is not supported yet."); } @Override public void setQueryTimeout(int seconds) throws SQLException { - throw new SQLFeatureNotSupportedException("setQueryTimeout not supported"); + System.err.println("setQueryTimeout() is not supported yet."); } @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { - throw new SQLFeatureNotSupportedException("isWrapperFor not supported"); + System.err.println("isWrapperFor() is not supported yet."); + return false; } @Override public <T> T unwrap(Class<T> iface) throws SQLException { - throw new SQLFeatureNotSupportedException("unwrap not supported"); + System.err.println("unwrap() is not supported yet."); + return null; } - }
