TAJO-1338: Defines RESTful API for Clients Closes #490
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/32b524d7 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/32b524d7 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/32b524d7 Branch: refs/heads/master Commit: 32b524d74f5e4a6a37a4b99c43830f8401fc9e45 Parents: 696d2aa Author: Jihun Kang <[email protected]> Authored: Mon Apr 6 11:48:46 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Mon Apr 6 11:48:46 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/catalog/TableDesc.java | 1 + .../java/org/apache/tajo/conf/TajoConf.java | 3 + tajo-core/pom.xml | 8 + .../java/org/apache/tajo/master/TajoMaster.java | 17 + .../exec/NonForwardQueryResultFileScanner.java | 5 + .../exec/NonForwardQueryResultScanner.java | 2 + .../NonForwardQueryResultSystemScanner.java | 5 + .../apache/tajo/ws/rs/ClientApplication.java | 179 ++++++++ .../tajo/ws/rs/JerseyResourceDelegate.java | 31 ++ .../ws/rs/JerseyResourceDelegateContext.java | 64 +++ .../ws/rs/JerseyResourceDelegateContextKey.java | 87 ++++ .../tajo/ws/rs/JerseyResourceDelegateUtil.java | 61 +++ .../org/apache/tajo/ws/rs/ResourcesUtil.java | 47 ++ .../org/apache/tajo/ws/rs/TajoRestService.java | 136 ++++++ .../tajo/ws/rs/requests/NewDatabaseRequest.java | 34 ++ .../tajo/ws/rs/requests/NewSessionRequest.java | 49 ++ .../tajo/ws/rs/requests/SubmitQueryRequest.java | 34 ++ .../tajo/ws/rs/resources/ClusterResource.java | 117 +++++ .../tajo/ws/rs/resources/DatabasesResource.java | 339 ++++++++++++++ .../tajo/ws/rs/resources/FunctionsResource.java | 121 +++++ .../tajo/ws/rs/resources/QueryResource.java | 448 +++++++++++++++++++ .../ws/rs/resources/QueryResultResource.java | 415 +++++++++++++++++ .../tajo/ws/rs/resources/SessionsResource.java | 380 ++++++++++++++++ .../tajo/ws/rs/resources/TablesResource.java | 350 +++++++++++++++ .../ws/rs/responses/DatabaseInfoResponse.java | 48 ++ .../tajo/ws/rs/responses/ExceptionResponse.java | 33 ++ .../responses/GetQueryResultDataResponse.java | 72 +++ .../ws/rs/responses/NewSessionResponse.java | 76 ++++ .../ws/rs/responses/ResultSetInfoResponse.java | 43 ++ .../responses/WorkerConnectionInfoResponse.java | 67 +++ .../tajo/ws/rs/responses/WorkerResponse.java | 76 ++++ .../org/apache/tajo/TajoTestingCluster.java | 5 + .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 2 + .../tajo/ws/rs/resources/RestTestUtils.java | 64 +++ .../ws/rs/resources/TestClusterResource.java | 84 ++++ .../ws/rs/resources/TestDatabasesResource.java | 189 ++++++++ .../ws/rs/resources/TestFunctionsResource.java | 78 ++++ .../tajo/ws/rs/resources/TestQueryResource.java | 170 +++++++ .../rs/resources/TestQueryResultResource.java | 287 ++++++++++++ .../ws/rs/resources/TestSessionsResource.java | 263 +++++++++++ .../ws/rs/resources/TestTablesResource.java | 195 ++++++++ tajo-project/pom.xml | 5 + tajo-rpc/tajo-ws-rs/pom.xml | 4 + .../apache/tajo/ws/rs/ResourceConfigUtil.java | 38 ++ .../tajo/ws/rs/netty/gson/GsonFeature.java | 25 +- .../tajo/ws/rs/netty/gson/GsonReader.java | 23 +- .../apache/tajo/ws/rs/netty/gson/GsonUtil.java | 2 +- .../tajo/ws/rs/netty/gson/GsonWriter.java | 23 +- .../tajo/ws/rs/netty/testapp2/Directory.java | 8 +- 50 files changed, 4807 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4ec678f..efadea8 100644 --- a/CHANGES +++ b/CHANGES @@ -155,6 +155,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1338: Defines RESTful API for Clients. (jihun) + TAJO-1284: Add alter partition method to CatalogStore. (jaehwa) TAJO-1392: Resolve findbug warnings on Tajo Plan Module. (jihun) http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java index ec679f9..ee096a2 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java @@ -22,6 +22,7 @@ import com.google.common.base.Objects; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.Expose; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.json.CatalogGsonHelper; http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 221b341..e892dc9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -135,6 +135,9 @@ public class TajoConf extends Configuration { Validators.networkAddr()), TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()), + // Tajo Rest Service + REST_SERVICE_PORT("tajo.rest.service.port", 26880), + // High availability configurations TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()), TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 61a156b..19c9ba3 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -283,6 +283,10 @@ <groupId>org.apache.tajo</groupId> <artifactId>tajo-thirdparty-asm</artifactId> </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-ws-rs</artifactId> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -368,6 +372,10 @@ <artifactId>antlr4</artifactId> </dependency> <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 371dfb4..51f82f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -59,6 +59,7 @@ import org.apache.tajo.util.history.HistoryWriter; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.QueryExecutorServlet; import org.apache.tajo.webapp.StaticHttpServer; +import org.apache.tajo.ws.rs.TajoRestService; import java.io.*; import java.lang.management.ManagementFactory; @@ -119,6 +120,7 @@ public class TajoMaster extends CompositeService { private WorkerResourceManager resourceManager; //Web Server private StaticHttpServer webServer; + private TajoRestService restServer; private QueryManager queryManager; @@ -193,6 +195,9 @@ public class TajoMaster extends CompositeService { tajoMasterService = new QueryCoordinatorService(context); addIfService(tajoMasterService); + + restServer = new TajoRestService(context); + addIfService(restServer); } catch (Exception e) { LOG.error(e.getMessage(), e); throw e; @@ -375,6 +380,14 @@ public class TajoMaster extends CompositeService { LOG.error(e, e); } } + + if (restServer != null) { + try { + restServer.stop(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } if (webServer != null) { try { @@ -474,6 +487,10 @@ public class TajoMaster extends CompositeService { public HistoryReader getHistoryReader() { return historyReader; } + + public TajoRestService getRestServer() { + return restServer; + } } String getThreadTaskName(long id, String name) { http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 6c02aa9..804821b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -143,6 +143,11 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc return tableDesc; } + @Override + public int getCurrentRowNumber() { + return currentNumRows; + } + public void close() throws Exception { if (scanExec != null) { scanExec.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java index 86d2843..75608df 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java @@ -43,4 +43,6 @@ public interface NonForwardQueryResultScanner { public void init() throws IOException; + public int getCurrentRowNumber(); + } http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 80bdb86..958c252 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -620,6 +620,11 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult public Schema getLogicalSchema() { return outSchema; } + + @Override + public int getCurrentRowNumber() { + return currentRow; + } class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl { http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java new file mode 100644 index 0000000..943f356 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import org.apache.tajo.QueryId; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.exec.NonForwardQueryResultScanner; +import org.apache.tajo.ws.rs.resources.ClusterResource; +import org.apache.tajo.ws.rs.resources.DatabasesResource; +import org.apache.tajo.ws.rs.resources.FunctionsResource; +import org.apache.tajo.ws.rs.resources.QueryResource; +import org.apache.tajo.ws.rs.resources.SessionsResource; +import org.apache.tajo.ws.rs.resources.TablesResource; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import javax.ws.rs.core.Application; + +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** + * It loads client classes for Tajo REST protocol. + */ +public class ClientApplication extends Application { + + private final MasterContext masterContext; + private final ConcurrentMap<QueryId, Long> queryIdToResultSetCacheIdMap; + private final Cache<Long, NonForwardQueryResultScanner> queryResultScannerCache; + + private final SecureRandom secureRandom; + + public ClientApplication(MasterContext masterContext) { + this.masterContext = masterContext; + + this.secureRandom = new SecureRandom(); + + this.queryIdToResultSetCacheIdMap = new ConcurrentHashMap<QueryId, Long>(); + this.queryResultScannerCache = CacheBuilder.newBuilder() + .concurrencyLevel(4) + .maximumSize(1000) + .expireAfterAccess(30, TimeUnit.MINUTES) + .build(); + } + + @Override + public Set<Class<?>> getClasses() { + Set<Class<?>> classes = new HashSet<Class<?>>(); + + classes.add(SessionsResource.class); + classes.add(DatabasesResource.class); + classes.add(TablesResource.class); + classes.add(FunctionsResource.class); + classes.add(ClusterResource.class); + classes.add(QueryResource.class); + + return classes; + } + + public MasterContext getMasterContext() { + return masterContext; + } + + /** + * It returns generated 8-byte size integer. + * + * @return + */ + private long generateCacheId() { + byte[] generatedBytes = new byte[8]; + long generatedId = 0; + + secureRandom.nextBytes(generatedBytes); + for (byte generatedByte: generatedBytes) { + generatedId = (generatedId << 8) + (generatedByte & 0xff); + } + + return generatedId; + } + + /** + * If cannot find any cache id for supplied query id, it will generate a new cache id. + * + * @param queryId + * @return + */ + public long generateCacheIdIfAbsent(QueryId queryId) { + Long cacheId = this.queryIdToResultSetCacheIdMap.get(queryId); + long newCacheId = 0; + + if (cacheId == null) { + boolean generated = false; + do { + newCacheId = generateCacheId(); + if (queryResultScannerCache.getIfPresent(newCacheId) == null) { + generated = true; + } + } while (!generated); + cacheId = this.queryIdToResultSetCacheIdMap.putIfAbsent(queryId, newCacheId); + if (cacheId != null) { + newCacheId = cacheId.longValue(); + } + } else { + newCacheId = cacheId.longValue(); + } + + return newCacheId; + } + + /** + * get cached NonForwardResultScanner instance by query id and cache id. + * + * @param queryId + * @param cacheId + * @return + */ + public NonForwardQueryResultScanner getCachedNonForwardResultScanner(QueryId queryId, long cacheId) { + Long cachedCacheId = queryIdToResultSetCacheIdMap.get(queryId); + + if (cachedCacheId == null) { + throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid."); + } + + if (cacheId != cachedCacheId.longValue()) { + throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid. " + + "Please use the valid cache id."); + } + + return queryResultScannerCache.getIfPresent(cachedCacheId); + } + + /** + * Store NonForwardResultScanner instance to cached memory if not present. + * + * @param queryId + * @param cacheId + * @param resultScanner + */ + public NonForwardQueryResultScanner setCachedNonForwardResultScanner(QueryId queryId, long cacheId, + NonForwardQueryResultScanner resultScanner) { + NonForwardQueryResultScanner cachedScanner = null; + + if (cacheId == 0) { + cacheId = generateCacheIdIfAbsent(queryId); + } + + cachedScanner = getCachedNonForwardResultScanner(queryId, cacheId); + if (cachedScanner == null) { + cachedScanner = this.queryResultScannerCache.asMap().putIfAbsent(cacheId, resultScanner); + if (cachedScanner == null) { + cachedScanner = resultScanner; + } + } + + return cachedScanner; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java new file mode 100644 index 0000000..89f7a28 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import javax.ws.rs.core.Response; + +/** + * Implements business flows for jersey resource + * + */ +public interface JerseyResourceDelegate { + + public Response run(JerseyResourceDelegateContext context); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java new file mode 100644 index 0000000..9f1fb4a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * It holds variables for running deletegates + */ +public class JerseyResourceDelegateContext { + + private final ConcurrentMap<JerseyResourceDelegateContextKey<?>, Object> contextMap = + new ConcurrentHashMap<JerseyResourceDelegateContextKey<?>, Object>(); + + /** + * Add value to Context. If value exists, it will overwrite. + * + * @param key + * @param value + * @return + */ + public <T> JerseyResourceDelegateContext put(JerseyResourceDelegateContextKey<T> key, T value) { + contextMap.put(key, value); + return this; + } + + public <T> T get(JerseyResourceDelegateContextKey<T> key) { + Class<T> keyTypeClass = key.getType(); + Object object = contextMap.get(key); + if (object != null) { + return keyTypeClass.cast(contextMap.get(key)); + } else { + if (Boolean.class.isAssignableFrom(keyTypeClass)) { + return keyTypeClass.cast(false); + } else if (Number.class.isAssignableFrom(keyTypeClass)) { + try { + return keyTypeClass.getConstructor(String.class).newInstance("0"); + } catch (Throwable e) { + return null; + } + } else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java new file mode 100644 index 0000000..3c059d4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.ws.rs.core.GenericType; + +public class JerseyResourceDelegateContextKey<T> { + + private static final ConcurrentMap<String, JerseyResourceDelegateContextKey<?>> keyMap = + new ConcurrentHashMap<String, JerseyResourceDelegateContextKey<?>>(); + + private final String name; + private final Class<T> type; + + private JerseyResourceDelegateContextKey(String name, Class<T> type) { + this.name = name; + this.type = type; + } + + public static <T> JerseyResourceDelegateContextKey<T> valueOf(String name, Class<T> type) { + if (name == null || name.isEmpty()) { + throw new RuntimeException("name cannnot be null or empty."); + } + + JerseyResourceDelegateContextKey<T> key = (JerseyResourceDelegateContextKey<T>) keyMap.get(name); + if (key == null) { + key = new JerseyResourceDelegateContextKey<T>(name, type); + keyMap.putIfAbsent(name, key); + } + + return key; + } + + @SuppressWarnings("unchecked") + public static <T> JerseyResourceDelegateContextKey<T> valueOf(String name, GenericType<T> genericType) { + return (JerseyResourceDelegateContextKey<T>) valueOf(name, genericType.getRawType()); + } + + public Class<T> getType() { + return type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + JerseyResourceDelegateContextKey other = (JerseyResourceDelegateContextKey) obj; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + return true; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java new file mode 100644 index 0000000..365f3b8 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; + +import org.apache.commons.logging.Log; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.ws.rs.ResourcesUtil; + +public class JerseyResourceDelegateUtil { + + public static final String ClientApplicationKey = "ClientApplication"; + public static final String MasterContextKey = "MasterContextKey"; + public static final String UriInfoKey = "UriInfoKey"; + + public static Response runJerseyResourceDelegate(JerseyResourceDelegate delegate, + Application application, + JerseyResourceDelegateContext context, + Log log) { + Application localApp = ResourceConfigUtil.getJAXRSApplication(application); + + if ((localApp != null) && (localApp instanceof ClientApplication)) { + ClientApplication clientApplication = (ClientApplication) localApp; + JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey = + JerseyResourceDelegateContextKey.valueOf(ClientApplicationKey, ClientApplication.class); + context.put(clientApplicationKey, clientApplication); + + MasterContext masterContext = clientApplication.getMasterContext(); + + if (masterContext != null) { + JerseyResourceDelegateContextKey<MasterContext> key = + JerseyResourceDelegateContextKey.valueOf(MasterContextKey, MasterContext.class); + context.put(key, masterContext); + + return delegate.run(context); + } else { + return ResourcesUtil.createExceptionResponse(log, "MasterContext is null."); + } + } else { + return ResourcesUtil.createExceptionResponse(log, "Invalid injection on SessionsResource."); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java new file mode 100644 index 0000000..5794636 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import org.apache.commons.logging.Log; +import org.apache.tajo.ws.rs.responses.ExceptionResponse; + +import javax.ws.rs.core.Response; + +public class ResourcesUtil { + + public static Response createExceptionResponse(Log log, String message) { + if (log != null) { + log.error(message); + } + + ExceptionResponse response = new ExceptionResponse(); + response.setMessage(message); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(response).build(); + } + + public static Response createBadRequestResponse(Log log, String message) { + if (log != null) { + log.error(message); + } + + ExceptionResponse response = new ExceptionResponse(); + response.setMessage(message); + return Response.status(Response.Status.BAD_REQUEST).entity(response).build(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java new file mode 100644 index 0000000..6da2b32 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.json.FunctionAdapter; +import org.apache.tajo.catalog.json.TableMetaAdapter; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.function.Function; +import org.apache.tajo.json.ClassNameSerializer; +import org.apache.tajo.json.DataTypeAdapter; +import org.apache.tajo.json.DatumAdapter; +import org.apache.tajo.json.GsonSerDerAdapter; +import org.apache.tajo.json.PathSerializer; +import org.apache.tajo.json.TimeZoneGsonSerdeAdapter; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.plan.function.AggFunction; +import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.serder.EvalNodeAdapter; +import org.apache.tajo.plan.serder.LogicalNodeAdapter; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.ws.rs.netty.NettyRestServer; +import org.apache.tajo.ws.rs.netty.NettyRestServerFactory; +import org.apache.tajo.ws.rs.netty.gson.GsonFeature; +import org.glassfish.jersey.filter.LoggingFilter; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.server.ServerProperties; + +import java.lang.reflect.Type; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Map; +import java.util.TimeZone; + +public class TajoRestService extends CompositeService { + + private final Log LOG = LogFactory.getLog(getClass()); + + private MasterContext masterContext; + private NettyRestServer restServer; + + public TajoRestService(MasterContext masterContext) { + super(TajoRestService.class.getName()); + + this.masterContext = masterContext; + } + + private Map<Type, GsonSerDerAdapter<?>> registerTypeAdapterMap() { + Map<Type, GsonSerDerAdapter<?>> adapters = TUtil.newHashMap(); + adapters.put(Path.class, new PathSerializer()); + adapters.put(Class.class, new ClassNameSerializer()); + adapters.put(LogicalNode.class, new LogicalNodeAdapter()); + adapters.put(EvalNode.class, new EvalNodeAdapter()); + adapters.put(TableMeta.class, new TableMetaAdapter()); + adapters.put(Function.class, new FunctionAdapter()); + adapters.put(GeneralFunction.class, new FunctionAdapter()); + adapters.put(AggFunction.class, new FunctionAdapter()); + adapters.put(Datum.class, new DatumAdapter()); + adapters.put(DataType.class, new DataTypeAdapter()); + adapters.put(TimeZone.class, new TimeZoneGsonSerdeAdapter()); + + return adapters; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + GsonFeature gsonFeature = new GsonFeature(registerTypeAdapterMap()); + + ClientApplication clientApplication = new ClientApplication(masterContext); + ResourceConfig resourceConfig = ResourceConfig.forApplication(clientApplication) + .register(gsonFeature) + .register(LoggingFilter.class) + .property(ServerProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true) + .property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true); + TajoConf tajoConf = (TajoConf) conf; + + int port = TajoConf.getIntVar(tajoConf, TajoConf.ConfVars.REST_SERVICE_PORT); + URI restServiceURI = new URI("http", null, "0.0.0.0", port, "/rest", null, null); + int workerCount = TajoConf.getIntVar(tajoConf, TajoConf.ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM); + restServer = NettyRestServerFactory.createNettyRestServer(restServiceURI, resourceConfig, workerCount, false); + + super.serviceInit(conf); + + LOG.info("Tajo Rest Service initialized."); + } + + @Override + protected void serviceStart() throws Exception { + restServer.start(); + + super.serviceStart(); + + LOG.info("Tajo Rest Service started."); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + + if (restServer != null) { + restServer.shutdown(); + } + + LOG.info("Tajo Rest Service stopped."); + } + + public InetSocketAddress getBindAddress() { + return restServer.getListenAddress(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java new file mode 100644 index 0000000..85f68d5 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.requests; + +import com.google.gson.annotations.Expose; + +public class NewDatabaseRequest { + @Expose private String databaseName; + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java new file mode 100644 index 0000000..8069179 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.requests; + +import com.google.gson.annotations.Expose; + +public class NewSessionRequest { + + @Expose private String userName; + @Expose private String databaseName; + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + @Override + public String toString() { + return "NewSessionRequest [userName=" + userName + ", databaseName=" + databaseName + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java new file mode 100644 index 0000000..0bf5678 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.requests; + +import com.google.gson.annotations.Expose; + +public class SubmitQueryRequest { + + @Expose private String query; + + public String getQuery() { + return query; + } + public void setQuery(String query) { + this.query = query; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java new file mode 100644 index 0000000..349fa4c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; +import org.apache.tajo.ws.rs.responses.WorkerResponse; + +@Path("/cluster") +public class ClusterResource { + + private static final Log LOG = LogFactory.getLog(ClusterResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + JerseyResourceDelegateContext context; + + private static final String workersName = "workers"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getClusterInfo() { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get cluster info request."); + } + + Response response = null; + + try { + initializeContext(); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetClusterInfoDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetClusterInfoDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers(); + List<WorkerResponse> workerList = new ArrayList<WorkerResponse>(); + + for (Worker worker: workerMap.values()) { + workerList.add(new WorkerResponse(worker)); + } + + Map<String, List<WorkerResponse>> workerResponseMap = new HashMap<String, List<WorkerResponse>>(); + workerResponseMap.put(workersName, workerList); + + return Response.ok(workerResponseMap).build(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java new file mode 100644 index 0000000..3868cab --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; +import org.apache.tajo.ws.rs.requests.NewDatabaseRequest; +import org.apache.tajo.ws.rs.responses.DatabaseInfoResponse; + +/** + * Deals with Database Management + */ +@Path("/databases") +public class DatabasesResource { + + private static final Log LOG = LogFactory.getLog(DatabasesResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + JerseyResourceDelegateContext context; + + private static final String databasesKeyName = "databases"; + private static final String newDatabaseRequestKeyName = "NewDatabaseKey"; + private static final String databaseNameKeyName = "databaseName"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + } + + /** + * Get all databases from catalog server + * + * @return + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getAllDatabases() { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent retrieve all databases request."); + } + + Response response = null; + try { + initializeContext(); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetAllDatabasesDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetAllDatabasesDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + Collection<String> databaseNames = masterContext.getCatalog().getAllDatabaseNames(); + Map<String, Collection<String>> databaseNamesMap = new HashMap<String, Collection<String>>(); + databaseNamesMap.put(databasesKeyName, databaseNames); + return Response.ok(databaseNamesMap).build(); + } + } + + /** + * + * @param request + * @return + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response createNewDatabase(NewDatabaseRequest request) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a new database creation request"); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<NewDatabaseRequest> newDatabaseRequestKey = + JerseyResourceDelegateContextKey.valueOf(newDatabaseRequestKeyName, NewDatabaseRequest.class); + context.put(newDatabaseRequestKey, request); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new CreateNewDatabaseDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class CreateNewDatabaseDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<NewDatabaseRequest> newDatabaseRequestKey = + JerseyResourceDelegateContextKey.valueOf(newDatabaseRequestKeyName, NewDatabaseRequest.class); + NewDatabaseRequest request = context.get(newDatabaseRequestKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + UriInfo uriInfo = context.get(uriInfoKey); + + if (request.getDatabaseName() == null || request.getDatabaseName().isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "databaseName is null or empty."); + } + + boolean databaseCreated = + masterContext.getCatalog().createDatabase(request.getDatabaseName(), + TajoConstants.DEFAULT_TABLESPACE_NAME); + + if (databaseCreated) { + URI newDatabaseURI = uriInfo.getBaseUriBuilder() + .path(DatabasesResource.class) + .path(DatabasesResource.class, "getDatabase") + .build(request.getDatabaseName()); + return Response.created(newDatabaseURI).build(); + } else { + return ResourcesUtil.createExceptionResponse(LOG, "Failed to create a new database."); + } + } + } + + /** + * + * @param databaseName + * @return + */ + @GET + @Path("/{databaseName}") + @Produces(MediaType.APPLICATION_JSON) + public Response getDatabase(@PathParam("databaseName") String databaseName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a getDatabase request."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetDatabaseDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetDatabaseDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + + if (databaseName.isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "DatabaseName is empty string."); + } + + CatalogService catalogService = masterContext.getCatalog(); + List<DatabaseProto> databasesList = catalogService.getAllDatabases(); + DatabaseProto selectedDatabase = null; + for (DatabaseProto database: databasesList) { + if (database.getName().equals(databaseName)) { + selectedDatabase = database; + break; + } + } + + if (selectedDatabase != null) { + List<TablespaceProto> tablespacesList = catalogService.getAllTablespaces(); + TablespaceProto selectedTablespace = null; + + for (TablespaceProto tablespace: tablespacesList) { + if (tablespace.hasId() && tablespace.getId() == selectedDatabase.getSpaceId()) { + selectedTablespace = tablespace; + break; + } + } + + DatabaseInfoResponse databaseInfo = new DatabaseInfoResponse(); + databaseInfo.setId(selectedDatabase.getId()); + databaseInfo.setName(selectedDatabase.getName()); + databaseInfo.setTablespace(selectedTablespace.getUri()); + return Response.ok(databaseInfo).build(); + } else { + return Response.status(Response.Status.NOT_FOUND).build(); + } + } + } + + /** + * + * @param databaseName + * @return + */ + @DELETE + @Path("/{databaseName}") + public Response deleteDatabase(@PathParam("databaseName") String databaseName) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a delete database request."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + + return JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new DeleteDatabaseDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class DeleteDatabaseDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + + if (databaseName.isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "DatabaseName is empty string."); + } + + CatalogService catalogService = masterContext.getCatalog(); + + if (!catalogService.existDatabase(databaseName)) { + return Response.status(Status.NOT_FOUND).build(); + } + + boolean databaseDropped = catalogService.dropDatabase(databaseName); + + if (databaseDropped) { + return Response.ok().build(); + } else { + return ResourcesUtil.createExceptionResponse(LOG, "Unable to drop a database " + databaseName); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java new file mode 100644 index 0000000..9545b1a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.FunctionDesc; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; + +@Path("/databases/{databaseName}/functions") +public class FunctionsResource { + + private static final Log LOG = LogFactory.getLog(TablesResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + @PathParam("databaseName") + String databaseName; + + JerseyResourceDelegateContext context; + + private static final String databaseNameKeyName = "databaseName"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getAllFunctions() { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a retrieve all functions request."); + } + + Response response = null; + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetAllFunctionsDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + return response; + } + + private static class GetAllFunctionsDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + Collection<FunctionDesc> functionDescriptors = masterContext.getCatalog().getFunctions(); + if (functionDescriptors.size() > 0) { + List<FunctionSignature> functionSignature = + new ArrayList<FunctionSignature>(functionDescriptors.size()); + for (FunctionDesc functionDesc : functionDescriptors) { + functionSignature.add(functionDesc.getSignature()); + } + + return Response.ok(functionSignature).build(); + } else { + return Response.status(Status.NOT_FOUND).build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java new file mode 100644 index 0000000..99609d7 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.resources; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriInfo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.ipc.ClientProtos; +import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; +import org.apache.tajo.master.QueryInProgress; +import org.apache.tajo.master.QueryInfo; +import org.apache.tajo.master.QueryManager; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.querymaster.QueryJobEvent; +import org.apache.tajo.session.InvalidSessionException; +import org.apache.tajo.session.Session; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.ws.rs.JerseyResourceDelegate; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContext; +import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey; +import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil; +import org.apache.tajo.ws.rs.ResourcesUtil; +import org.apache.tajo.ws.rs.requests.SubmitQueryRequest; + +@Path("/databases/{databaseName}/queries") +public class QueryResource { + + private static final Log LOG = LogFactory.getLog(QueryResource.class); + + @Context + UriInfo uriInfo; + + @Context + Application application; + + @PathParam("databaseName") + String databaseName; + + JerseyResourceDelegateContext context; + + protected static final String tajoSessionIdHeaderName = "X-Tajo-Session"; + + private static final String databaseNameKeyName = "databaseName"; + private static final String stateKeyName = "state"; + private static final String startTimeKeyName = "startTime"; + private static final String endTimeKeyName = "endTime"; + private static final String sessionIdKeyName = "sessionId"; + private static final String submitQueryRequestKeyName = "submitQueryRequest"; + private static final String printTypeKeyName = "printType"; + private static final String queryIdKeyName = "queryId"; + + private void initializeContext() { + context = new JerseyResourceDelegateContext(); + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + context.put(uriInfoKey, uriInfo); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + context.put(databaseNameKey, databaseName); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response getAllQueries(@QueryParam("state") String state, + @QueryParam("startTime") long startTime, + @QueryParam("endTime") long endTime) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get all queries request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> stateKey = + JerseyResourceDelegateContextKey.valueOf(stateKeyName, String.class); + if (state != null && !state.isEmpty()) { + context.put(stateKey, state); + } + JerseyResourceDelegateContextKey<Long> startTimeKey = + JerseyResourceDelegateContextKey.valueOf(startTimeKeyName, Long.class); + if (startTime > 0) { + context.put(startTimeKey, startTime); + } + JerseyResourceDelegateContextKey<Long> endTimeKey = + JerseyResourceDelegateContextKey.valueOf(endTimeKeyName, Long.class); + if (endTime > 0) { + context.put(endTimeKey, endTime); + } + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetAllQueriesDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetAllQueriesDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> stateKey = + JerseyResourceDelegateContextKey.valueOf(stateKeyName, String.class); + String state = context.get(stateKey); + JerseyResourceDelegateContextKey<Long> startTimeKey = + JerseyResourceDelegateContextKey.valueOf(startTimeKeyName, Long.class); + long startTime = context.get(startTimeKey); + JerseyResourceDelegateContextKey<Long> endTimeKey = + JerseyResourceDelegateContextKey.valueOf(endTimeKeyName, Long.class); + long endTime = context.get(endTimeKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + TajoProtos.QueryState queryState = null; + try { + if (state != null && !state.isEmpty()) { + queryState = TajoProtos.QueryState.valueOf(state); + } + } catch (Exception e) { + return ResourcesUtil.createBadRequestResponse(LOG, state + " is not a valid query state."); + } + + Map<String, List<QueryInfo>> queriesMap = new HashMap<String, List<QueryInfo>>(); + List<QueryInfo> queriesInfo = new ArrayList<QueryInfo>(); + + QueryManager queryManager = masterContext.getQueryJobManager(); + for (QueryInProgress queryInProgress: queryManager.getSubmittedQueries()) { + queriesInfo.add(queryInProgress.getQueryInfo()); + } + + for (QueryInProgress queryInProgress: queryManager.getRunningQueries()) { + queriesInfo.add(queryInProgress.getQueryInfo()); + } + + try { + queriesInfo.addAll(masterContext.getHistoryReader().getQueries(null)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + return ResourcesUtil.createExceptionResponse(LOG, e.getMessage()); + } + + if (state != null) { + queriesInfo = selectQueriesInfoByState(queriesInfo, queryState); + } + + if (startTime > 0 || endTime > 0) { + queriesInfo = selectQueriesInfoByTime(queriesInfo, startTime, endTime); + } + queriesMap.put("queries", queriesInfo); + + return Response.ok(queriesMap).build(); + } + + private List<QueryInfo> selectQueriesInfoByState(List<QueryInfo> queriesInfo, TajoProtos.QueryState state) { + List<QueryInfo> resultQueriesInfo = new ArrayList<QueryInfo>(queriesInfo.size()/2); + + for (QueryInfo queryInfo: queriesInfo) { + if (state.equals(queryInfo.getQueryState())) { + resultQueriesInfo.add(queryInfo); + } + } + + return resultQueriesInfo; + } + + private List<QueryInfo> selectQueriesInfoByTime(List<QueryInfo> queriesInfo, long startTime, long endTime) { + List<QueryInfo> resultQueriesInfo = new ArrayList<QueryInfo>(queriesInfo.size()/2); + + for (QueryInfo queryInfo: queriesInfo) { + if (queryInfo.getStartTime() > startTime) { + resultQueriesInfo.add(queryInfo); + } + if (queryInfo.getStartTime() < endTime) { + resultQueriesInfo.add(queryInfo); + } + } + + return resultQueriesInfo; + } + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + public Response submitQuery(@HeaderParam(tajoSessionIdHeaderName) String sessionId, + SubmitQueryRequest request) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a submit query request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + context.put(sessionIdKey, sessionId); + JerseyResourceDelegateContextKey<SubmitQueryRequest> submitQueryRequestKey = + JerseyResourceDelegateContextKey.valueOf(submitQueryRequestKeyName, SubmitQueryRequest.class); + context.put(submitQueryRequestKey, request); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new SubmitQueryDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class SubmitQueryDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> sessionIdKey = + JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, String.class); + String sessionId = context.get(sessionIdKey); + JerseyResourceDelegateContextKey<SubmitQueryRequest> submitQueryRequestKey = + JerseyResourceDelegateContextKey.valueOf(submitQueryRequestKeyName, SubmitQueryRequest.class); + SubmitQueryRequest request = context.get(submitQueryRequestKey); + JerseyResourceDelegateContextKey<String> databaseNameKey = + JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, String.class); + String databaseName = context.get(databaseNameKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + if (sessionId == null || sessionId.isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "Session Id is null or empty string."); + } + if (request == null || request.getQuery() == null || request.getQuery().isEmpty()) { + return ResourcesUtil.createBadRequestResponse(LOG, "query is null or emptry string."); + } + + Session session; + try { + session = masterContext.getSessionManager().getSession(sessionId); + } catch (InvalidSessionException e) { + return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid."); + } + + SubmitQueryResponse response = + masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false); + if (response.hasResultCode() && ClientProtos.ResultCode.ERROR.equals(response.getResultCode())) { + return ResourcesUtil.createExceptionResponse(LOG, response.getErrorMessage()); + } else { + JerseyResourceDelegateContextKey<UriInfo> uriInfoKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, UriInfo.class); + UriInfo uriInfo = context.get(uriInfoKey); + + URI queryURI = uriInfo.getBaseUriBuilder() + .path(QueryResource.class) + .path(QueryResource.class, "getQuery") + .build(databaseName, new QueryId(response.getQueryId()).toString()); + return Response.created(queryURI).build(); + } + } + } + + @GET + @Path("{queryId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getQuery(@PathParam("queryId") String queryId, @QueryParam("print") String printType) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a get query request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + context.put(queryIdKey, queryId); + JerseyResourceDelegateContextKey<String> printTypeKey = + JerseyResourceDelegateContextKey.valueOf(printTypeKeyName, String.class); + context.put(printTypeKey, printType); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new GetQueryDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class GetQueryDelegate implements JerseyResourceDelegate { + + private static final String briefPrint = "BRIEF"; + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + String queryId = context.get(queryIdKey); + JerseyResourceDelegateContextKey<String> printTypeKey = + JerseyResourceDelegateContextKey.valueOf(printTypeKeyName, String.class); + String printType = context.get(printTypeKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId); + + QueryManager queryManager = masterContext.getQueryJobManager(); + QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryIdObj); + + QueryInfo queryInfo = null; + if (queryInProgress == null) { + queryInfo = queryManager.getFinishedQuery(queryIdObj); + } else { + queryInfo = queryInProgress.getQueryInfo(); + } + + if (queryInfo != null) { + if (briefPrint.equalsIgnoreCase(printType)) { + queryInfo = getBriefQueryInfo(queryInfo); + } + return Response.ok(queryInfo).build(); + } else { + return Response.status(Status.NOT_FOUND).build(); + } + } + + private QueryInfo getBriefQueryInfo(QueryInfo queryInfo) { + QueryInfo newQueryInfo = new QueryInfo(queryInfo.getQueryId(), null, null, null); + newQueryInfo.setQueryState(queryInfo.getQueryState()); + newQueryInfo.setStartTime(queryInfo.getStartTime()); + return newQueryInfo; + } + } + + @DELETE + @Path("{queryId}") + public Response terminateQuery(@PathParam("queryId") String queryId) { + if (LOG.isDebugEnabled()) { + LOG.debug("Client sent a terminate query request."); + } + + Response response = null; + + try { + initializeContext(); + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + context.put(queryIdKey, queryId); + + response = JerseyResourceDelegateUtil.runJerseyResourceDelegate( + new TerminateQueryDelegate(), + application, + context, + LOG); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + + response = ResourcesUtil.createExceptionResponse(null, e.getMessage()); + } + + return response; + } + + private static class TerminateQueryDelegate implements JerseyResourceDelegate { + + @Override + public Response run(JerseyResourceDelegateContext context) { + JerseyResourceDelegateContextKey<String> queryIdKey = + JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, String.class); + String queryId = context.get(queryIdKey); + JerseyResourceDelegateContextKey<MasterContext> masterContextKey = + JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey, MasterContext.class); + MasterContext masterContext = context.get(masterContextKey); + + QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId); + + QueryManager queryManager = masterContext.getQueryJobManager(); + queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL, + new QueryInfo(queryIdObj))); + return Response.ok().build(); + } + } + + @Path("/{queryId}/result") + public QueryResultResource getQueryResult(@PathParam("queryId") String queryId) { + QueryResultResource queryResultResource = new QueryResultResource(); + queryResultResource.setUriInfo(uriInfo); + queryResultResource.setApplication(application); + queryResultResource.setDatabaseName(databaseName); + queryResultResource.setQueryId(queryId); + return queryResultResource; + } +}
