http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 4900188..a97cb33 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -26,6 +26,9 @@ import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.NotNull; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; +import org.apache.tajo.catalog.exception.UndefinedDatabaseException; +import org.apache.tajo.client.v2.exception.ClientConnectionException; +import org.apache.tajo.exception.NoSuchSessionVariableException; import org.apache.tajo.exception.SQLExceptionUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; @@ -36,7 +39,6 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.RpcConstants; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse; @@ -57,14 +59,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE; -import static org.apache.tajo.exception.ReturnStateUtil.isError; -import static org.apache.tajo.exception.ReturnStateUtil.isSuccess; -import static org.apache.tajo.exception.ReturnStateUtil.isThisError; -import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException; +import static org.apache.tajo.error.Errors.ResultCode.UNDEFINED_DATABASE; +import static org.apache.tajo.exception.ReturnStateUtil.*; import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError; +import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse; -import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; public class SessionConnection implements Closeable { @@ -101,7 +101,7 @@ public class SessionConnection implements Closeable { * @throws SQLException */ public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseDatabase, - @NotNull KeyValueSet properties) throws SQLException { + @NotNull KeyValueSet properties) { this.serviceTracker = tracker; this.baseDatabase = baseDatabase; this.properties = properties; @@ -117,7 +117,7 @@ public class SessionConnection implements Closeable { return Collections.unmodifiableMap(sessionVarsCache); } - public synchronized NettyClientBase getTajoMasterConnection() throws SQLException { + public synchronized NettyClientBase getTajoMasterConnection() { if (client != null && client.isConnected()) { return client; @@ -138,14 +138,14 @@ public class SessionConnection implements Closeable { connections.incrementAndGet(); } catch (Throwable t) { - throw SQLExceptionUtil.makeUnableToEstablishConnection(t); + throw new ClientConnectionException(t); } return client; } } - protected BlockingInterface getTMStub() throws SQLException { + protected BlockingInterface getTMStub() { NettyClientBase tmClient; tmClient = getTajoMasterConnection(); BlockingInterface stub = tmClient.getStub(); @@ -185,7 +185,7 @@ public class SessionConnection implements Closeable { return userInfo; } - public String getCurrentDatabase() throws SQLException { + public String getCurrentDatabase() { NettyClientBase client = getTajoMasterConnection(); checkSessionAndGet(client); @@ -198,11 +198,11 @@ public class SessionConnection implements Closeable { throw new RuntimeException(e); } - throwIfError(response.getState()); + ensureOk(response.getState()); return response.getValue(); } - public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws SQLException { + public Map<String, String> updateSessionVariables(final Map<String, String> variables) { NettyClientBase client = getTajoMasterConnection(); checkSessionAndGet(client); @@ -221,15 +221,12 @@ public class SessionConnection implements Closeable { throw new RuntimeException(e); } - if (isSuccess(response.getState())) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw toSQLException(response.getState()); - } + ensureOk(response.getState()); + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); } - public Map<String, String> unsetSessionVariables(final List<String> variables) throws SQLException { + public Map<String, String> unsetSessionVariables(final List<String> variables) throws NoSuchSessionVariableException { final BlockingInterface stub = getTMStub(); final UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder() @@ -244,12 +241,13 @@ public class SessionConnection implements Closeable { throw new RuntimeException(e); } - if (isSuccess(response.getState())) { - updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); - return Collections.unmodifiableMap(sessionVarsCache); - } else { - throw toSQLException(response.getState()); + if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) { + throw new NoSuchSessionVariableException(response.getState()); } + + ensureOk(response.getState()); + updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); + return Collections.unmodifiableMap(sessionVarsCache); } void updateSessionVarsCache(Map<String, String> variables) { @@ -259,7 +257,7 @@ public class SessionConnection implements Closeable { } } - public String getSessionVariable(final String varname) throws SQLException { + public String getSessionVariable(final String varname) throws NoSuchSessionVariableException { synchronized (sessionVarsCache) { // If a desired variable is client side one and exists in the cache, immediately return the variable. if (sessionVarsCache.containsKey(varname)) { @@ -271,35 +269,41 @@ public class SessionConnection implements Closeable { checkSessionAndGet(client); BlockingInterface stub = client.getStub(); - + StringResponse response; try { - return stub.getSessionVariable(null, getSessionedString(varname)).getValue(); + response = stub.getSessionVariable(null, getSessionedString(varname)); } catch (ServiceException e) { throw new RuntimeException(e); } + + if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) { + throw new NoSuchSessionVariableException(response.getState()); + } + + ensureOk(response.getState()); + return response.getValue(); } - public Boolean existSessionVariable(final String varname) throws SQLException { + public Boolean existSessionVariable(final String varname) { + ReturnState state; try { final BlockingInterface stub = getTMStub(); - ReturnState state = stub.existSessionVariable(null, getSessionedString(varname)); - - if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) { - return false; - } else if (isError(state)){ - throw SQLExceptionUtil.toSQLException(state); - } - - return isSuccess(state); - + state = stub.existSessionVariable(null, getSessionedString(varname)); } catch (ServiceException e) { throw new RuntimeException(e); } + + if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) { + return false; + } + + ensureOk(state); + return true; } - public Map<String, String> getAllSessionVariables() throws SQLException { + public Map<String, String> getAllSessionVariables() { NettyClientBase client = getTajoMasterConnection(); checkSessionAndGet(client); @@ -311,22 +315,29 @@ public class SessionConnection implements Closeable { throw new RuntimeException(e); } - throwIfError(response.getState()); + ensureOk(response.getState()); return ProtoUtil.convertToMap(response.getValue()); } - public Boolean selectDatabase(final String databaseName) throws SQLException { + public Boolean selectDatabase(final String dbName) throws UndefinedDatabaseException { BlockingInterface stub = getTMStub(); boolean selected; try { - selected = isSuccess(stub.selectDatabase(null, getSessionedString(databaseName))); + ReturnState state = stub.selectDatabase(null, getSessionedString(dbName)); + + if (isThisError(state, UNDEFINED_DATABASE)) { + throw new UndefinedDatabaseException(dbName); + } + + selected = ensureOk(state); + } catch (ServiceException e) { throw new RuntimeException(e); } if (selected) { - this.baseDatabase = databaseName; + this.baseDatabase = dbName; } return selected; } @@ -362,7 +373,7 @@ public class SessionConnection implements Closeable { return serviceTracker.getClientServiceAddress(); } - protected void checkSessionAndGet(NettyClientBase client) throws SQLException { + protected void checkSessionAndGet(NettyClientBase client) { if (sessionId == null) { @@ -390,7 +401,7 @@ public class SessionConnection implements Closeable { LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); } } else { - throw SQLExceptionUtil.toSQLException(response.getState()); + throw new InvalidClientSessionException(sessionId.getId()); } } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index b66d451..8c167a4 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -26,8 +26,10 @@ import org.apache.tajo.annotation.ThreadSafe; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; @@ -90,7 +92,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que // QueryClient wrappers /*------------------------------------------------------------------------*/ - public void closeQuery(final QueryId queryId) throws SQLException { + public void closeQuery(final QueryId queryId) { queryClient.closeQuery(queryId); } @@ -98,23 +100,23 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que queryClient.closeNonForwardQuery(queryId); } - public SubmitQueryResponse executeQuery(final String sql) throws SQLException { + public SubmitQueryResponse executeQuery(final String sql) { return queryClient.executeQuery(sql); } - public SubmitQueryResponse executeQueryWithJson(final String json) throws SQLException { + public SubmitQueryResponse executeQueryWithJson(final String json) { return queryClient.executeQueryWithJson(json); } - public ResultSet executeQueryAndGetResult(final String sql) throws SQLException { + public ResultSet executeQueryAndGetResult(final String sql) throws TajoException { return queryClient.executeQueryAndGetResult(sql); } - public ResultSet executeJsonQueryAndGetResult(final String json) throws SQLException { + public ResultSet executeJsonQueryAndGetResult(final String json) throws TajoException { return queryClient.executeJsonQueryAndGetResult(json); } - public QueryStatus getQueryStatus(QueryId queryId) throws SQLException { + public QueryStatus getQueryStatus(QueryId queryId) { return queryClient.getQueryStatus(queryId); } @@ -134,15 +136,15 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return queryClient.fetchNextQueryResult(queryId, fetchRowNum); } - public boolean updateQuery(final String sql) throws SQLException { + public boolean updateQuery(final String sql) throws TajoException { return queryClient.updateQuery(sql); } - public boolean updateQueryWithJson(final String json) throws SQLException { + public boolean updateQueryWithJson(final String json) throws TajoException { return queryClient.updateQueryWithJson(json); } - public QueryStatus killQuery(final QueryId queryId) throws SQLException { + public QueryStatus killQuery(final QueryId queryId) { return queryClient.killQuery(queryId); } @@ -150,7 +152,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que return queryClient.getRunningQueryList(); } - public List<BriefQueryInfo> getFinishedQueryList() throws SQLException { + public List<BriefQueryInfo> getFinishedQueryList() { return queryClient.getFinishedQueryList(); } @@ -178,54 +180,54 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que // CatalogClient wrappers /*------------------------------------------------------------------------*/ - public boolean createDatabase(final String databaseName) throws SQLException { + public boolean createDatabase(final String databaseName) throws DuplicateDatabaseException { return catalogClient.createDatabase(databaseName); } - public boolean existDatabase(final String databaseName) throws SQLException { + public boolean existDatabase(final String databaseName) { return catalogClient.existDatabase(databaseName); } - public boolean dropDatabase(final String databaseName) throws SQLException { + public boolean dropDatabase(final String databaseName) throws UndefinedDatabaseException { return catalogClient.dropDatabase(databaseName); } - public List<String> getAllDatabaseNames() throws SQLException { + public List<String> getAllDatabaseNames() { return catalogClient.getAllDatabaseNames(); } - public boolean existTable(final String tableName) throws SQLException { + public boolean existTable(final String tableName) { return catalogClient.existTable(tableName); } public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, - final TableMeta meta) throws SQLException { + final TableMeta meta) throws DuplicateTableException { return catalogClient.createExternalTable(tableName, schema, path, meta); } public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path, final TableMeta meta, final PartitionMethodDesc partitionMethodDesc) - throws SQLException { + throws DuplicateTableException { return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc); } - public boolean dropTable(final String tableName) throws SQLException { + public boolean dropTable(final String tableName) throws UndefinedTableException { return dropTable(tableName, false); } - public boolean dropTable(final String tableName, final boolean purge) throws SQLException { + public boolean dropTable(final String tableName, final boolean purge) throws UndefinedTableException { return catalogClient.dropTable(tableName, purge); } - public List<String> getTableList(@Nullable final String databaseName) throws SQLException { + public List<String> getTableList(@Nullable final String databaseName) { return catalogClient.getTableList(databaseName); } - public TableDesc getTableDesc(final String tableName) throws SQLException { + public TableDesc getTableDesc(final String tableName) throws UndefinedTableException { return catalogClient.getTableDesc(tableName); } - public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws SQLException { + public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) { return catalogClient.getFunctions(functionName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java index 358f1a0..c79b756 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java @@ -58,7 +58,7 @@ public class TajoClientUtil { return !isQueryWaitingForSchedule(state) && !isQueryRunning(state); } - public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws SQLException { + public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) { QueryStatus status = client.getQueryStatus(queryId); while(!isQueryComplete(status.getState())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java new file mode 100644 index 0000000..8dce7c4 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import org.apache.tajo.catalog.exception.UndefinedDatabaseException; +import org.apache.tajo.exception.TajoException; + +import java.io.Closeable; +import java.sql.ResultSet; + +/** + * ClientDelegate is a delegate for various wired protocols like protocol buffer, rest API, and proxy. + */ +public interface ClientDelegate extends Closeable { + + int executeUpdate(String sql) throws TajoException; + + ResultSet executeSQL(String sql) throws TajoException; + + QueryFuture executeSQLAsync(String sql) throws TajoException; + + String currentDB(); + + void selectDB(String db) throws UndefinedDatabaseException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java new file mode 100644 index 0000000..44721b3 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.client.v2.exception.ClientUnableToConnectException; + +import java.util.Map; + +public class ClientDelegateFactory { + + public static ClientDelegate newDefaultDelegate(String host, + int port, + @Nullable Map<String, String> props) + throws ClientUnableToConnectException { + + return new LegacyClientDelegate(host, port, props); + } + + public static ClientDelegate newDefaultDelegate(ServiceDiscovery discovery, + @Nullable Map<String, String> props) + throws ClientUnableToConnectException { + + return new LegacyClientDelegate(discovery, props); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java new file mode 100644 index 0000000..b6a00e2 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +public class ClientUtil { + + public static boolean isOk(QueryState state) { + return !(state == QueryState.ERROR || state == QueryState.FAILED); + } + + public static boolean isFailed(QueryState state) { + return state == QueryState.ERROR || state == QueryState.FAILED; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java new file mode 100644 index 0000000..ac6283e --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import java.util.EventListener; + +public interface FutureListener<V> extends EventListener { + void processingCompleted(V future); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java new file mode 100644 index 0000000..a17311b --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java @@ -0,0 +1,485 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractFuture; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.annotation.ThreadSafe; +import org.apache.tajo.auth.UserRoleInfo; +import org.apache.tajo.catalog.exception.UndefinedDatabaseException; +import org.apache.tajo.client.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerException; +import org.apache.tajo.service.TajoMasterInfo; +import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.NetUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.sql.ResultSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.tajo.exception.ReturnStateUtil.ensureOk; + +@ThreadSafe +public class LegacyClientDelegate extends SessionConnection implements ClientDelegate { + + private QueryClientImpl queryClient; + private final ExecutorService executor = Executors.newFixedThreadPool(8); + + public LegacyClientDelegate(String host, int port, Map<String, String> props) { + super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, new KeyValueSet(props)); + queryClient = new QueryClientImpl(this); + } + + public LegacyClientDelegate(ServiceDiscovery discovery, Map<String, String> props) { + super(new DelegateServiceTracker(discovery), null, new KeyValueSet(props)); + queryClient = new QueryClientImpl(this); + } + + @Override + public int executeUpdate(String sql) throws TajoException { + queryClient.updateQuery(sql); + return 0; + } + + @Override + public ResultSet executeSQL(String sql) throws TajoException { + try { + return executeSQLAsync(sql).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public QueryFuture executeSQLAsync(String sql) throws TajoException { + ClientProtos.SubmitQueryResponse response = queryClient.executeQuery(sql); + ClientExceptionUtil.throwIfError(response.getState()); + + QueryId queryId = new QueryId(response.getQueryId()); + + switch (response.getResultType()) { + case ENCLOSED: + return new QueryFutureForEnclosed(queryId, TajoClientUtil.createResultSet(this.queryClient, response, 200)); + case FETCH: + AsyncQueryFuture future = new AsyncQueryFuture(queryId); + executor.execute(future); + return future; + default: + return new QueryFutureForNoFetch(queryId); + } + } + + @Override + public String currentDB() { + return getCurrentDatabase(); + } + + @Override + public void selectDB(String db) throws UndefinedDatabaseException { + selectDatabase(db); + } + + private class QueryFutureForNoFetch implements QueryFuture { + protected final QueryId id; + private final long now = System.currentTimeMillis(); + + QueryFutureForNoFetch(QueryId id) { + this.id = id; + } + + @Override + public String id() { + return id.toString(); + } + + @Override + public String queue() { + return "default"; + } + + @Override + public QueryState state() { + return QueryState.COMPLETED; + } + + @Override + public float progress() { + return 1.0f; + } + + @Override + public boolean isOk() { + return true; + } + + @Override + public boolean isSuccessful() { + return true; + } + + @Override + public boolean isFailed() { + return false; + } + + @Override + public boolean isKilled() { + return false; + } + + @Override + public UserRoleInfo user() { + return UserRoleInfo.getCurrentUser(); + } + + @Override + public void kill() { + } + + @Override + public long submitTime() { + return 0; + } + + @Override + public long startTime() { + return now; + } + + @Override + public long finishTime() { + return now; + } + + @Override + public void release() { + queryClient.closeQuery(id); + } + + @Override + public void addListener(FutureListener<QueryFuture> future) { + future.processingCompleted(this); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public ResultSet get() { + return TajoClientUtil.NULL_RESULT_SET; + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) { + return TajoClientUtil.NULL_RESULT_SET; + } + } + + private class QueryFutureForEnclosed extends QueryFutureForNoFetch { + private final ResultSet resultSet; + QueryFutureForEnclosed(QueryId id, ResultSet resultSet) { + super(id); + this.resultSet = resultSet; + } + + @Override + public ResultSet get() { + return resultSet; + } + + @Override + public ResultSet get(long timeout, TimeUnit unit) { + return resultSet; + } + } + + private class AsyncQueryFuture extends AbstractFuture<ResultSet> implements QueryFuture, Runnable { + private final QueryId queryId; + private volatile QueryState lastState; + private volatile float progress; + private final long submitTime = System.currentTimeMillis(); + private volatile long startTime = 0; + private volatile long finishTime = 0; + + public AsyncQueryFuture(QueryId queryId) { + this.queryId = queryId; + this.lastState = QueryState.SCHEDULED; + } + + @Override + public String id() { + return queryId.toString(); + } + + @Override + public boolean isOk() { + return ClientUtil.isOk(lastState); + } + + @Override + public boolean isSuccessful() { + return lastState == QueryState.COMPLETED; + } + + @Override + public boolean isFailed() { + return ClientUtil.isFailed(lastState); + } + + @Override + public boolean isKilled() { + return queryClient.getQueryStatus(queryId).getState() == TajoProtos.QueryState.QUERY_KILLED; + } + + @Override + public QueryState state() { + return lastState; + } + + @Override + public String queue() { + return "default"; + } + + @Override + public UserRoleInfo user() { + return UserRoleInfo.getCurrentUser(); + } + + @Override + public float progress() { + return progress; + } + + @Override + public void kill() { + queryClient.killQuery(queryId).getState(); + } + + @Override + public long submitTime() { + return submitTime; + } + + @Override + public long startTime() { + return startTime; + } + + @Override + public long finishTime() { + return finishTime; + } + + @Override + public void release() { + queryClient.closeQuery(queryId); + } + + @Override + public void addListener(final FutureListener<QueryFuture> listener) { + final QueryFuture f = this; + addListener(new Runnable() { + @Override + public void run() { + listener.processingCompleted(f); + }}, + executor); + } + + private void updateState(GetQueryStatusResponse lastState) { + this.startTime = lastState.getSubmitTime(); + this.finishTime = lastState.getFinishTime(); + this.progress = lastState.getProgress(); + this.lastState = convert(lastState.getQueryState()); + } + + GetQueryStatusResponse waitCompletion() { + GetQueryStatusResponse response = queryClient.getRawQueryStatus(queryId); + ensureOk(response.getState()); + updateState(response); + + while(!TajoClientUtil.isQueryComplete(response.getQueryState())) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + response = queryClient.getRawQueryStatus(queryId); + updateState(response); + ensureOk(response.getState()); + } + return response; + } + + @Override + public void run() { + GetQueryStatusResponse finalResponse; + try { + finalResponse = waitCompletion(); + } catch (Throwable t) { + setException(t); + return; + } + + if (finalResponse.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { + if (finalResponse.hasHasResult()) { + set(queryClient.getQueryResult(queryId)); + } else { // when update + set(TajoClientUtil.NULL_RESULT_SET); + } + } else { + cancel(false); // failed + set(TajoClientUtil.NULL_RESULT_SET); + } + } + } + + public static class DelegateServiceTracker implements ServiceTracker { + + private final ServiceDiscovery discovery; + DelegateServiceTracker(ServiceDiscovery discovery) { + this.discovery = discovery; + } + + @Override + public boolean isHighAvailable() { + return false; + } + + @Override + public InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException { + return null; + } + + @Override + public InetSocketAddress getClientServiceAddress() throws ServiceTrackerException { + return discovery.clientAddress(); + } + + @Override + public InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public InetSocketAddress getCatalogAddress() throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public int getState(String masterName, TajoConf conf) throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public int formatHA(TajoConf conf) throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public List<String> getMasters(TajoConf conf) throws ServiceTrackerException { + throw new UnimplementedException(); + } + + @Override + public void register() throws IOException { + throw new UnimplementedException(); + } + + @Override + public void delete() throws IOException { + throw new UnimplementedException(); + } + + @Override + public boolean isActiveMaster() { + throw new UnimplementedException(); + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + throw new UnimplementedException(); + } + } + + public static QueryState convert(TajoProtos.QueryState state) { + switch (state) { + case QUERY_NEW: + case QUERY_INIT: + case QUERY_NOT_ASSIGNED: + return QueryState.SCHEDULED; + + case QUERY_MASTER_INIT: + case QUERY_MASTER_LAUNCHED: + case QUERY_RUNNING: + return QueryState.RUNNING; + + case QUERY_KILL_WAIT: + return QueryState.KILLING; + + case QUERY_KILLED: + return QueryState.KILLED; + + case QUERY_FAILED: + return QueryState.FAILED; + + case QUERY_ERROR: + return QueryState.ERROR; + + case QUERY_SUCCEEDED: + return QueryState.COMPLETED; + + default: + throw new RuntimeException("Unknown state:" + state.name()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java new file mode 100644 index 0000000..f1604cd --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import org.apache.tajo.auth.UserRoleInfo; + +import java.sql.ResultSet; +import java.util.concurrent.Future; + +/** + * + */ +public interface QueryFuture extends Future<ResultSet> { + /** + * Get a query id + * + * @return query id + */ + String id(); + + /** + * Get the queue name that the query is running + * + * @return queue name + */ + String queue(); + + /** + * Get a query state + * + * @return query state + */ + QueryState state(); + + /** + * Get a normalized progress (0 ~ 1.0f) of a query running + * + * @return progress + */ + float progress(); + + /** + * A submitted or running query state is normal + * + * @return True if a query state is normal + */ + boolean isOk(); + + /** + * Get whether the query is successfully completed or not. + * + * @return True if the query is successfully completed. + */ + boolean isSuccessful(); + + /** + * Get whether the query is abort due to error. + * + * @return True if the query is abort due to error. + */ + boolean isFailed(); + + /** + * Get whether the query is killed. This is equivalent to + * @{link Future#cancel}. + * + * @return True if the query is already killed. + */ + boolean isKilled(); + + /** + * Get an user information + * + * @return UserRoleInfo + */ + UserRoleInfo user(); + + /** + * Kill this query + */ + void kill(); + + /** + * Get the time when a query is submitted. + * This time can be different from @{link QueryFuture#startTime} + * due to scheduling delay. + * + * @return Millisecond since epoch + */ + long submitTime(); + + /** + * Get the time when a query is actually launched. + * + * @return Millisecond since epoch + */ + long startTime(); + + /** + * Get the time when a query is finished. + * + * @return Millisecond since epoch + */ + long finishTime(); + + /** + * Release a query future. It will be automatically released after the session invalidation. + */ + void release(); + + /** + * Add a listener which will be executed after this query is completed, error, failed or killed. + * + * @param future + */ + void addListener(FutureListener<QueryFuture> future); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java new file mode 100644 index 0000000..24ea386 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +public enum QueryState { + /** successfully submitted */ + SCHEDULED, + /** Running */ + RUNNING, + /** Error before a query execution */ + ERROR, + /** Failure after a query launches */ + FAILED, + /** Killed */ + KILLED, + /** Wait for completely kill */ + KILLING, + /** Successfully completed */ + COMPLETED +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java new file mode 100644 index 0000000..e69ca8a --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import java.net.InetSocketAddress; + +/** + * Client service discovery interface + */ +public interface ServiceDiscovery { + InetSocketAddress clientAddress(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java new file mode 100644 index 0000000..08a921d --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2; + +import org.apache.tajo.catalog.exception.UndefinedDatabaseException; +import org.apache.tajo.client.v2.exception.ClientUnableToConnectException; +import org.apache.tajo.exception.TajoException; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.ResultSet; +import java.util.Map; + +public class TajoClient implements Closeable { + /** + * default client port number + */ + public static final int DEFAULT_PORT = 26002; + + private final ClientDelegate delegate; + + /** + * Initialize TajoClient with a hostname and default port 26002. + * + * @param host hostname to connect + */ + public TajoClient(String host) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, null); + } + + /** + * Initialize TajoClient with a hostname and default port 26002. + * + * @param host Hostname to connect + * @param properties Connection properties + */ + public TajoClient(String host, Map<String, String> properties) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, properties); + } + + /** + * Initialize TajoClient with a hostname and port + * + * @param host Hostname to connect + * @param port Port number to connect + */ + public TajoClient(String host, int port) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, port, null); + } + + /** + * Initialize TajoClient with a hostname and port + * + * @param host Hostname to connect + * @param port Port number to connect + * @param properties Connection properties + */ + public TajoClient(String host, int port, Map<String, String> properties) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(host, port, properties); + } + + /** + * Initialize TajoClient via service discovery protocol + * + * @param discovery Service discovery + */ + public TajoClient(ServiceDiscovery discovery) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(discovery, null); + } + + /** + * Initialize TajoClient via service discovery protocol + * + * @param discovery Service discovery + * @param properties Connection properties + */ + public TajoClient(ServiceDiscovery discovery, Map<String, String> properties) throws ClientUnableToConnectException { + delegate = ClientDelegateFactory.newDefaultDelegate(discovery, properties); + } + + /** + * Submit and executes the given SQL statement, which may be an <code>INSERT (INTO)</code>, + * or <code>CREATE TABLE AS SELECT</code> statement or anSQL statement that returns nothing, + * such as an SQL DDL statement. + * + * @param sql a SQL statement + * @return inserted row number + * @throws TajoException + */ + public int executeUpdate(String sql) throws TajoException { + return delegate.executeUpdate(sql); + } + + /** + * Submit a SQL query statement + * + * @param sql a SQL statement + * @return QueryHandler + * @throws TajoException + */ + public ResultSet executeQuery(String sql) throws TajoException { + return delegate.executeSQL(sql); + } + + /** + * Execute a SQL statement through asynchronous API + * + * @param sql + * @return + * @throws TajoException + */ + public QueryFuture executeQueryAsync(String sql) throws TajoException { + return delegate.executeSQLAsync(sql); + } + + public void close() throws IOException { + delegate.close(); + } + + /** + * Select working database + * + * @param database Database name + * @throws UndefinedDatabaseException + */ + public void selectDB(String database) throws UndefinedDatabaseException { + delegate.selectDB(database); + } + + /** + * Get the current working database + * + * @return Current working database + */ + public String currentDB() { + return delegate.currentDB(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java new file mode 100644 index 0000000..a7fb08a --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2.exception; + +import org.apache.tajo.error.Errors; +import org.apache.tajo.exception.TajoRuntimeException; + +public class ClientConnectionException extends TajoRuntimeException { + public ClientConnectionException(Throwable t) { + super(Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION, t.getMessage()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java new file mode 100644 index 0000000..e567d7d --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client.v2.exception; + +import org.apache.tajo.error.Errors; +import org.apache.tajo.exception.TajoException; + +public class ClientUnableToConnectException extends TajoException { + public ClientUnableToConnectException() { + super(Errors.ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java index 869d7c4..8f15710 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java @@ -87,6 +87,6 @@ public class FetchResultSet extends TajoResultSetBase { currentResultSet.close(); currentResultSet = null; } - tajoClient.closeNonForwardQuery(queryId); + tajoClient.closeQuery(queryId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java index 7cf6e1e..574dc3b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java @@ -18,14 +18,17 @@ package org.apache.tajo.exception; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; + import static org.apache.tajo.error.Errors.ResultCode.AMBIGUOUS_COLUMN; public class AmbiguousColumnException extends TajoException { private static final long serialVersionUID = 3102675985226352347L; - /** - * @param fieldName - */ + public AmbiguousColumnException(ReturnState state) { + super(state); + } + public AmbiguousColumnException(String fieldName) { super(AMBIGUOUS_COLUMN, fieldName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java index d50164d..3b646e1 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java @@ -21,6 +21,7 @@ package org.apache.tajo.exception; import com.google.common.collect.Maps; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.StringUtils; import java.util.Map; @@ -99,6 +100,9 @@ public class ErrorMessages { ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" + " : '%s'", 1); + + ADD_MESSAGE(CLIENT_CONNECTION_EXCEPTION, "Client connection to '%s' has error: %s", 2); + ADD_MESSAGE(CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "Client is unable to establish connection to '%s'", 1); } private static void ADD_MESSAGE(ResultCode code, String msgFormat) { @@ -138,7 +142,7 @@ public class ErrorMessages { } } else { - throw new TajoRuntimeException(code, args); + throw new TajoInternalError("Argument mismatch: code=" + code.name() + ", args=" + StringUtils.join(args)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java new file mode 100644 index 0000000..c9ed78d --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.exception; + +import org.apache.tajo.error.Errors; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; + +public class NoSuchSessionVariableException extends TajoException { + + public NoSuchSessionVariableException(PrimitiveProtos.ReturnState state) { + super(state); + } + + public NoSuchSessionVariableException(String variableName) { + super(Errors.ResultCode.NO_SUCH_SESSION_VARIABLE, variableName); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index fb6b9a5..81554c4 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -45,10 +45,11 @@ public class ReturnStateUtil { OK = builder.build(); } - public static void ensureOk(ReturnState state) { + public static boolean ensureOk(ReturnState state) { if (isError(state)) { throw new TajoRuntimeException(state); } + return true; } public static StringListResponse returnStringList(Collection<String> values) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java index 10b5aff..deeb7f9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java @@ -19,6 +19,7 @@ package org.apache.tajo.exception; import com.google.common.collect.Maps; +import org.apache.log4j.spi.ErrorCode; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.exception.ErrorMessages; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; @@ -54,27 +55,34 @@ public class SQLExceptionUtil { } } - public static SQLException toSQLException(ReturnState state) throws SQLException { - - if (SQLSTATES.containsKey(state.getReturnCode())) { + private static SQLException toSQLException(ResultCode code, String message) throws SQLException { + if (SQLSTATES.containsKey(code)) { return new SQLException( - state.getMessage(), - SQLSTATES.get(state.getReturnCode()), - state.getReturnCode().getNumber() + message, + SQLSTATES.get(code), + code.getNumber() ); } else { // If there is no SQLState corresponding to error code, // It will make SQLState '42000' (Syntax Error Or Access Rule Violation). return new SQLException( - state.getMessage(), + message, "42000", ResultCode.SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION_VALUE ); } } + public static SQLException toSQLException(TajoException e) throws SQLException { + return toSQLException(e.getErrorCode(), e.getMessage()); + } + + public static SQLException toSQLException(ReturnState state) throws SQLException { + return toSQLException(state.getReturnCode(), state.getMessage()); + } + public static SQLException makeSQLException(ResultCode code, String ...args) { if (SQLSTATES.containsKey(code)) { return new SQLException( http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java index dbb2748..765ead3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java @@ -19,6 +19,8 @@ package org.apache.tajo.exception; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; /** * Unrecoverable errors @@ -26,6 +28,11 @@ import org.apache.tajo.error.Errors.ResultCode; public class TajoError extends Error implements TajoExceptionInterface { private ResultCode code; + public TajoError(ReturnState state) { + super(state.getMessage()); + code = state.getReturnCode(); + } + public TajoError(ResultCode code) { super(ErrorMessages.getMessage(code)); this.code = code; http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java index 781d1a0..e0e2ccb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java @@ -19,6 +19,7 @@ package org.apache.tajo.exception; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; /** * TajoException contains all exceptions with any exact reason. @@ -27,6 +28,16 @@ import org.apache.tajo.error.Errors.ResultCode; public class TajoException extends Exception implements TajoExceptionInterface { private ResultCode code; + public TajoException(ReturnState e) { + super(e.getMessage()); + this.code = e.getReturnCode(); + } + + public TajoException(TajoRuntimeException e) { + super(e.getMessage()); + this.code = e.getErrorCode(); + } + public TajoException(ResultCode code) { super(ErrorMessages.getMessage(code)); this.code = code; http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java index 767c13c..072636b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java @@ -19,12 +19,18 @@ package org.apache.tajo.exception; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; /** * Exception for Internal Bugs and Unexpected exception */ public class TajoInternalError extends TajoError { + public TajoInternalError(ReturnState state) { + super(state); + } + public TajoInternalError(String message) { super(ResultCode.INTERNAL_ERROR, message); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java index 9d91946..70586c9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java @@ -19,9 +19,14 @@ package org.apache.tajo.exception; import org.apache.tajo.error.Errors; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; public class UndefinedOperatorException extends TajoException { + public UndefinedOperatorException(ReturnState state) { + super(state); + } + public UndefinedOperatorException(String operation) { super(Errors.ResultCode.UNDEFINED_OPERATOR, operation); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java index 9ca5539..a7a3915 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java @@ -19,10 +19,15 @@ package org.apache.tajo.exception; import org.apache.tajo.error.Errors; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; public class UnsupportedException extends TajoRuntimeException { private static final long serialVersionUID = 6702291354858193578L; + public UnsupportedException(ReturnState state) { + super(state); + } + public UnsupportedException(String featureName) { super(Errors.ResultCode.FEATURE_NOT_SUPPORTED, featureName); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 7489503..60e02a6 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -54,8 +54,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>1.6</source> - <target>1.6</target> + <source>1.7</source> + <target>1.7</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java index b4a28db..90c95a1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java @@ -28,6 +28,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.FileUtil; @@ -91,7 +92,7 @@ public abstract class BenchmarkSet { public abstract void loadQueries() throws IOException; - public abstract void loadTables() throws SQLException; + public abstract void loadTables() throws TajoException; public String [] getTableNames() { return schemas.keySet().toArray(new String[schemas.size()]); http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java index 91a3b66..9739767 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java @@ -32,6 +32,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.storage.StorageConstants; import java.io.File; @@ -192,7 +193,7 @@ public class TPCH extends BenchmarkSet { loadQueries(BENCHMARK_DIR); } - public void loadTables() throws SQLException { + public void loadTables() throws TajoException { loadTable(LINEITEM); loadTable(CUSTOMER); loadTable(CUSTOMER_PARTS); @@ -206,7 +207,7 @@ public class TPCH extends BenchmarkSet { } - public void loadTable(String tableName) throws SQLException { + public void loadTable(String tableName) throws TajoException { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); meta.putOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 67f782a..07445a4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -29,6 +29,9 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.AmbiguousFunctionException; +import org.apache.tajo.catalog.exception.CatalogException; +import org.apache.tajo.catalog.exception.UndefinedFunctionException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; @@ -37,10 +40,7 @@ import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.InternalException; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.Target; @@ -346,31 +346,32 @@ public class GlobalPlanner { } } - private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws InternalException { - FunctionDesc functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION, + private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws CatalogException { + FunctionDesc functionDesc = null; + functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION, args[0].getValueType()); return new AggregationFunctionCallEval(functionDesc, args); } - private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws InternalException { + private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws CatalogException { FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION, args[0].getValueType()); return new AggregationFunctionCallEval(functionDesc, args); } - private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws InternalException { + private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws CatalogException { FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION, new TajoDataTypes.DataType[]{}); return new AggregationFunctionCallEval(functionDesc, args); } - private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws InternalException { + private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws CatalogException { FunctionDesc functionDesc = getCatalog().getFunction("max", CatalogProtos.FunctionType.AGGREGATION, args[0].getValueType()); return new AggregationFunctionCallEval(functionDesc, args); } - private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws InternalException { + private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws CatalogException { FunctionDesc functionDesc = getCatalog().getFunction("min", CatalogProtos.FunctionType.AGGREGATION, args[0].getValueType()); return new AggregationFunctionCallEval(functionDesc, args); @@ -428,57 +429,53 @@ public class GlobalPlanner { */ private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanContext context, AggregationFunctionCallEval function) - throws PlanningException { + throws TajoException { LogicalPlan plan = context.plan.getLogicalPlan(); RewrittenFunctions rewritten = null; - try { - if (function.getName().equalsIgnoreCase("count")) { - rewritten = new RewrittenFunctions(1); - - if (function.getArgs().length == 0) { - rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs()); - } else { - rewritten.firstStageEvals[0] = createCountFunction(function.getArgs()); - } - String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); - FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); - rewritten.firstStageTargets[0] = new Target(fieldEval); - rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval}); - } else if (function.getName().equalsIgnoreCase("sum")) { - rewritten = new RewrittenFunctions(1); - - rewritten.firstStageEvals[0] = createSumFunction(function.getArgs()); - String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); - FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); - rewritten.firstStageTargets[0] = new Target(fieldEval); - rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval}); - - } else if (function.getName().equals("max")) { - rewritten = new RewrittenFunctions(1); - - rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs()); - String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); - FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); - rewritten.firstStageTargets[0] = new Target(fieldEval); - rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval}); - - } else if (function.getName().equals("min")) { - - rewritten = new RewrittenFunctions(1); - - rewritten.firstStageEvals[0] = createMinFunction(function.getArgs()); - String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); - FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); - rewritten.firstStageTargets[0] = new Target(fieldEval); - rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval}); + if (function.getName().equalsIgnoreCase("count")) { + rewritten = new RewrittenFunctions(1); + if (function.getArgs().length == 0) { + rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs()); } else { - throw new PlanningException("Cannot support a mix of other functions"); + rewritten.firstStageEvals[0] = createCountFunction(function.getArgs()); } - } catch (InternalException e) { - LOG.error(e, e); + String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); + FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); + rewritten.firstStageTargets[0] = new Target(fieldEval); + rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval}); + } else if (function.getName().equalsIgnoreCase("sum")) { + rewritten = new RewrittenFunctions(1); + + rewritten.firstStageEvals[0] = createSumFunction(function.getArgs()); + String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); + FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); + rewritten.firstStageTargets[0] = new Target(fieldEval); + rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval}); + + } else if (function.getName().equals("max")) { + rewritten = new RewrittenFunctions(1); + + rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs()); + String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); + FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); + rewritten.firstStageTargets[0] = new Target(fieldEval); + rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval}); + + } else if (function.getName().equals("min")) { + + rewritten = new RewrittenFunctions(1); + + rewritten.firstStageEvals[0] = createMinFunction(function.getArgs()); + String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]); + FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType()); + rewritten.firstStageTargets[0] = new Target(fieldEval); + rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval}); + + } else { + throw new UnsupportedException("Cannot support a mix of other functions"); } return rewritten; @@ -523,7 +520,7 @@ public class GlobalPlanner { */ private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPlanContext context, ExecutionBlock latestExecBlock, - GroupbyNode groupbyNode) throws PlanningException { + GroupbyNode groupbyNode) throws TajoException { Column [] originalGroupingColumns = groupbyNode.getGroupingColumns(); LinkedHashSet<Column> firstStageGroupingColumns = http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index b70a79f..d16f7d1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -38,6 +38,7 @@ import org.apache.tajo.catalog.CatalogServer; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.LocalCatalogWrapper; +import org.apache.tajo.catalog.exception.DuplicateDatabaseException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; @@ -369,7 +370,7 @@ public class TajoMaster extends CompositeService { } } - private void checkBaseTBSpaceAndDatabase() throws IOException { + private void checkBaseTBSpaceAndDatabase() throws IOException, DuplicateDatabaseException { if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR)); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java index 40ebf4e..a43b95e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java @@ -28,6 +28,7 @@ import org.apache.tajo.catalog.exception.DuplicateTableException; import org.apache.tajo.catalog.exception.UndefinedTablespaceException; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.plan.logical.CreateTableNode; @@ -52,7 +53,7 @@ public class CreateTableExecutor { } public TableDesc create(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists) - throws IOException { + throws IOException, TajoException { TableMeta meta; if (createTable.hasOptions()) { @@ -85,7 +86,7 @@ public class CreateTableExecutor { @Nullable URI uri, boolean isExternal, @Nullable PartitionMethodDesc partitionDesc, - boolean ifNotExists) throws IOException { + boolean ifNotExists) throws IOException, TajoException { Pair<String, String> separatedNames = getQualifiedName(queryContext.getCurrentDatabase(), tableName); String databaseName = separatedNames.getFirst(); @@ -119,7 +120,7 @@ public class CreateTableExecutor { } } - private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) { + private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) throws DuplicateTableException { if (ifNotExists) { LOG.info("relation \"" + qualifiedName + "\" is already exists."); return catalog.getTableDesc(qualifiedName); @@ -131,13 +132,15 @@ public class CreateTableExecutor { private Pair<String, String> getQualifiedName(String currentDatabase, String tableName) { if (CatalogUtil.isFQTableName(tableName)) { String [] splitted = CatalogUtil.splitFQTableName(tableName); - return new Pair<String, String>(splitted[0], splitted[1]); + return new Pair<>(splitted[0], splitted[1]); } else { - return new Pair<String, String>(currentDatabase, tableName); + return new Pair<>(currentDatabase, tableName); } } - private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri) { + private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri) + throws UndefinedTablespaceException { + if (tableSpaceName != null) { Optional<Tablespace> ts = (Optional<Tablespace>) TablespaceManager.getByName(tableSpaceName); if (ts.isPresent()) {
