TAJO-1338: Defines RESTful API for Clients

Closes #490


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/32b524d7
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/32b524d7
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/32b524d7

Branch: refs/heads/master
Commit: 32b524d74f5e4a6a37a4b99c43830f8401fc9e45
Parents: 696d2aa
Author: Jihun Kang <[email protected]>
Authored: Mon Apr 6 11:48:46 2015 +0900
Committer: Jihun Kang <[email protected]>
Committed: Mon Apr 6 11:48:46 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/catalog/TableDesc.java |   1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +
 tajo-core/pom.xml                               |   8 +
 .../java/org/apache/tajo/master/TajoMaster.java |  17 +
 .../exec/NonForwardQueryResultFileScanner.java  |   5 +
 .../exec/NonForwardQueryResultScanner.java      |   2 +
 .../NonForwardQueryResultSystemScanner.java     |   5 +
 .../apache/tajo/ws/rs/ClientApplication.java    | 179 ++++++++
 .../tajo/ws/rs/JerseyResourceDelegate.java      |  31 ++
 .../ws/rs/JerseyResourceDelegateContext.java    |  64 +++
 .../ws/rs/JerseyResourceDelegateContextKey.java |  87 ++++
 .../tajo/ws/rs/JerseyResourceDelegateUtil.java  |  61 +++
 .../org/apache/tajo/ws/rs/ResourcesUtil.java    |  47 ++
 .../org/apache/tajo/ws/rs/TajoRestService.java  | 136 ++++++
 .../tajo/ws/rs/requests/NewDatabaseRequest.java |  34 ++
 .../tajo/ws/rs/requests/NewSessionRequest.java  |  49 ++
 .../tajo/ws/rs/requests/SubmitQueryRequest.java |  34 ++
 .../tajo/ws/rs/resources/ClusterResource.java   | 117 +++++
 .../tajo/ws/rs/resources/DatabasesResource.java | 339 ++++++++++++++
 .../tajo/ws/rs/resources/FunctionsResource.java | 121 +++++
 .../tajo/ws/rs/resources/QueryResource.java     | 448 +++++++++++++++++++
 .../ws/rs/resources/QueryResultResource.java    | 415 +++++++++++++++++
 .../tajo/ws/rs/resources/SessionsResource.java  | 380 ++++++++++++++++
 .../tajo/ws/rs/resources/TablesResource.java    | 350 +++++++++++++++
 .../ws/rs/responses/DatabaseInfoResponse.java   |  48 ++
 .../tajo/ws/rs/responses/ExceptionResponse.java |  33 ++
 .../responses/GetQueryResultDataResponse.java   |  72 +++
 .../ws/rs/responses/NewSessionResponse.java     |  76 ++++
 .../ws/rs/responses/ResultSetInfoResponse.java  |  43 ++
 .../responses/WorkerConnectionInfoResponse.java |  67 +++
 .../tajo/ws/rs/responses/WorkerResponse.java    |  76 ++++
 .../org/apache/tajo/TajoTestingCluster.java     |   5 +
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |   2 +
 .../tajo/ws/rs/resources/RestTestUtils.java     |  64 +++
 .../ws/rs/resources/TestClusterResource.java    |  84 ++++
 .../ws/rs/resources/TestDatabasesResource.java  | 189 ++++++++
 .../ws/rs/resources/TestFunctionsResource.java  |  78 ++++
 .../tajo/ws/rs/resources/TestQueryResource.java | 170 +++++++
 .../rs/resources/TestQueryResultResource.java   | 287 ++++++++++++
 .../ws/rs/resources/TestSessionsResource.java   | 263 +++++++++++
 .../ws/rs/resources/TestTablesResource.java     | 195 ++++++++
 tajo-project/pom.xml                            |   5 +
 tajo-rpc/tajo-ws-rs/pom.xml                     |   4 +
 .../apache/tajo/ws/rs/ResourceConfigUtil.java   |  38 ++
 .../tajo/ws/rs/netty/gson/GsonFeature.java      |  25 +-
 .../tajo/ws/rs/netty/gson/GsonReader.java       |  23 +-
 .../apache/tajo/ws/rs/netty/gson/GsonUtil.java  |   2 +-
 .../tajo/ws/rs/netty/gson/GsonWriter.java       |  23 +-
 .../tajo/ws/rs/netty/testapp2/Directory.java    |   8 +-
 50 files changed, 4807 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4ec678f..efadea8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -155,6 +155,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1338: Defines RESTful API for Clients. (jihun)
+
     TAJO-1284: Add alter partition method to CatalogStore. (jaehwa)
 
     TAJO-1392: Resolve findbug warnings on Tajo Plan Module. (jihun)

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index ec679f9..ee096a2 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -22,6 +22,7 @@ import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 221b341..e892dc9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -135,6 +135,9 @@ public class TajoConf extends Configuration {
         Validators.networkAddr()),
     TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", 
Validators.networkAddr()),
 
+    // Tajo Rest Service
+    REST_SERVICE_PORT("tajo.rest.service.port", 26880),
+
     // High availability configurations
     TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
     TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 
1000), // 5 sec

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 61a156b..19c9ba3 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -283,6 +283,10 @@
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-thirdparty-asm</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-ws-rs</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -368,6 +372,10 @@
       <artifactId>antlr4</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 371dfb4..51f82f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -59,6 +59,7 @@ import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.QueryExecutorServlet;
 import org.apache.tajo.webapp.StaticHttpServer;
+import org.apache.tajo.ws.rs.TajoRestService;
 
 import java.io.*;
 import java.lang.management.ManagementFactory;
@@ -119,6 +120,7 @@ public class TajoMaster extends CompositeService {
   private WorkerResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
+  private TajoRestService restServer;
 
   private QueryManager queryManager;
 
@@ -193,6 +195,9 @@ public class TajoMaster extends CompositeService {
 
       tajoMasterService = new QueryCoordinatorService(context);
       addIfService(tajoMasterService);
+      
+      restServer = new TajoRestService(context);
+      addIfService(restServer);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw e;
@@ -375,6 +380,14 @@ public class TajoMaster extends CompositeService {
         LOG.error(e, e);
       }
     }
+    
+    if (restServer != null) {
+      try {
+        restServer.stop();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
 
     if (webServer != null) {
       try {
@@ -474,6 +487,10 @@ public class TajoMaster extends CompositeService {
     public HistoryReader getHistoryReader() {
       return historyReader;
     }
+    
+    public TajoRestService getRestServer() {
+      return restServer;
+    }
   }
 
   String getThreadTaskName(long id, String name) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 6c02aa9..804821b 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -143,6 +143,11 @@ public class NonForwardQueryResultFileScanner implements 
NonForwardQueryResultSc
     return tableDesc;
   }
 
+  @Override
+  public int getCurrentRowNumber() {
+    return currentNumRows;
+  }
+
   public void close() throws Exception {
     if (scanExec != null) {
       scanExec.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
index 86d2843..75608df 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultScanner.java
@@ -43,4 +43,6 @@ public interface NonForwardQueryResultScanner {
 
   public void init() throws IOException;
 
+  public int getCurrentRowNumber();
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 80bdb86..958c252 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -620,6 +620,11 @@ public class NonForwardQueryResultSystemScanner implements 
NonForwardQueryResult
   public Schema getLogicalSchema() {
     return outSchema;
   }
+
+  @Override
+  public int getCurrentRowNumber() {
+    return currentRow;
+  }
   
   class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java
new file mode 100644
index 0000000..943f356
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
+import org.apache.tajo.ws.rs.resources.ClusterResource;
+import org.apache.tajo.ws.rs.resources.DatabasesResource;
+import org.apache.tajo.ws.rs.resources.FunctionsResource;
+import org.apache.tajo.ws.rs.resources.QueryResource;
+import org.apache.tajo.ws.rs.resources.SessionsResource;
+import org.apache.tajo.ws.rs.resources.TablesResource;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import javax.ws.rs.core.Application;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * It loads client classes for Tajo REST protocol.
+ */
+public class ClientApplication extends Application {
+
+  private final MasterContext masterContext;
+  private final ConcurrentMap<QueryId, Long> queryIdToResultSetCacheIdMap;
+  private final Cache<Long, NonForwardQueryResultScanner> 
queryResultScannerCache;
+  
+  private final SecureRandom secureRandom;
+
+  public ClientApplication(MasterContext masterContext) {
+    this.masterContext = masterContext;
+    
+    this.secureRandom = new SecureRandom();
+    
+    this.queryIdToResultSetCacheIdMap = new ConcurrentHashMap<QueryId, Long>();
+    this.queryResultScannerCache = CacheBuilder.newBuilder()
+        .concurrencyLevel(4)
+        .maximumSize(1000)
+        .expireAfterAccess(30, TimeUnit.MINUTES)
+        .build();
+  }
+
+  @Override
+  public Set<Class<?>> getClasses() {
+    Set<Class<?>> classes = new HashSet<Class<?>>();
+    
+    classes.add(SessionsResource.class);
+    classes.add(DatabasesResource.class);
+    classes.add(TablesResource.class);
+    classes.add(FunctionsResource.class);
+    classes.add(ClusterResource.class);
+    classes.add(QueryResource.class);
+    
+    return classes;
+  }
+
+  public MasterContext getMasterContext() {
+    return masterContext;
+  }
+  
+  /**
+   * It returns generated 8-byte size integer.
+   * 
+   * @return
+   */
+  private long generateCacheId() {
+    byte[] generatedBytes = new byte[8];
+    long generatedId = 0;
+    
+    secureRandom.nextBytes(generatedBytes);
+    for (byte generatedByte: generatedBytes) {
+      generatedId = (generatedId << 8) + (generatedByte & 0xff);
+    }
+    
+    return generatedId;
+  }
+  
+  /**
+   * If cannot find any cache id for supplied query id, it will generate a new 
cache id.
+   * 
+   * @param queryId
+   * @return
+   */
+  public long generateCacheIdIfAbsent(QueryId queryId) {
+    Long cacheId = this.queryIdToResultSetCacheIdMap.get(queryId);
+    long newCacheId = 0;
+    
+    if (cacheId == null) {
+      boolean generated = false;
+      do {
+        newCacheId = generateCacheId();
+        if (queryResultScannerCache.getIfPresent(newCacheId) == null) {
+          generated = true;
+        }
+      } while (!generated);
+      cacheId = this.queryIdToResultSetCacheIdMap.putIfAbsent(queryId, 
newCacheId);
+      if (cacheId != null) {
+        newCacheId = cacheId.longValue();
+      }
+    } else {
+      newCacheId = cacheId.longValue();
+    }
+    
+    return newCacheId;
+  }
+  
+  /**
+   * get cached NonForwardResultScanner instance by query id and cache id.
+   * 
+   * @param queryId
+   * @param cacheId
+   * @return
+   */
+  public NonForwardQueryResultScanner getCachedNonForwardResultScanner(QueryId 
queryId, long cacheId) {
+    Long cachedCacheId = queryIdToResultSetCacheIdMap.get(queryId);
+    
+    if (cachedCacheId == null) {
+      throw new RuntimeException("Supplied cache id " + cacheId + " was 
expired or invalid.");
+    }
+    
+    if (cacheId != cachedCacheId.longValue()) {
+      throw new RuntimeException("Supplied cache id " + cacheId + " was 
expired or invalid. " +
+          "Please use the valid cache id.");
+    }
+    
+    return queryResultScannerCache.getIfPresent(cachedCacheId);
+  }
+  
+  /**
+   * Store NonForwardResultScanner instance to cached memory if not present.
+   * 
+   * @param queryId
+   * @param cacheId
+   * @param resultScanner
+   */
+  public NonForwardQueryResultScanner setCachedNonForwardResultScanner(QueryId 
queryId, long cacheId, 
+      NonForwardQueryResultScanner resultScanner) {
+    NonForwardQueryResultScanner cachedScanner = null;
+    
+    if (cacheId == 0) {
+      cacheId = generateCacheIdIfAbsent(queryId);
+    }
+    
+    cachedScanner = getCachedNonForwardResultScanner(queryId, cacheId);
+    if (cachedScanner == null) {
+      cachedScanner = 
this.queryResultScannerCache.asMap().putIfAbsent(cacheId, resultScanner);
+      if (cachedScanner == null) {
+        cachedScanner = resultScanner;
+      }
+    }
+    
+    return cachedScanner;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java
new file mode 100644
index 0000000..89f7a28
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegate.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import javax.ws.rs.core.Response;
+
+/**
+ * Implements business flows for jersey resource
+ * 
+ */
+public interface JerseyResourceDelegate {
+
+  public Response run(JerseyResourceDelegateContext context);
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java
new file mode 100644
index 0000000..9f1fb4a
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContext.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * It holds variables for running deletegates
+ */
+public class JerseyResourceDelegateContext {
+
+  private final ConcurrentMap<JerseyResourceDelegateContextKey<?>, Object> 
contextMap =
+      new ConcurrentHashMap<JerseyResourceDelegateContextKey<?>, Object>();
+  
+  /**
+   * Add value to Context. If value exists, it will overwrite.
+   * 
+   * @param key
+   * @param value
+   * @return
+   */
+  public <T> JerseyResourceDelegateContext 
put(JerseyResourceDelegateContextKey<T> key, T value) {
+    contextMap.put(key, value);
+    return this;
+  }
+  
+  public <T> T get(JerseyResourceDelegateContextKey<T> key) {
+    Class<T> keyTypeClass = key.getType();
+    Object object = contextMap.get(key);
+    if (object != null) {
+      return keyTypeClass.cast(contextMap.get(key));
+    } else {
+      if (Boolean.class.isAssignableFrom(keyTypeClass)) {
+        return keyTypeClass.cast(false);
+      } else if (Number.class.isAssignableFrom(keyTypeClass)) {
+        try {
+          return keyTypeClass.getConstructor(String.class).newInstance("0");
+        } catch (Throwable e) {
+          return null;
+        }
+      } else {
+        return null;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java
new file mode 100644
index 0000000..3c059d4
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateContextKey.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.ws.rs.core.GenericType;
+
+public class JerseyResourceDelegateContextKey<T> {
+  
+  private static final ConcurrentMap<String, 
JerseyResourceDelegateContextKey<?>> keyMap =
+      new ConcurrentHashMap<String, JerseyResourceDelegateContextKey<?>>();
+  
+  private final String name;
+  private final Class<T> type;
+  
+  private JerseyResourceDelegateContextKey(String name, Class<T> type) {
+    this.name = name;
+    this.type = type;
+  }
+  
+  public static <T> JerseyResourceDelegateContextKey<T> valueOf(String name, 
Class<T> type) {
+    if (name == null || name.isEmpty()) {
+      throw new RuntimeException("name cannnot be null or empty.");
+    }
+    
+    JerseyResourceDelegateContextKey<T> key = 
(JerseyResourceDelegateContextKey<T>) keyMap.get(name);
+    if (key == null) {
+      key = new JerseyResourceDelegateContextKey<T>(name, type);
+      keyMap.putIfAbsent(name, key);
+    }
+    
+    return key;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public static <T> JerseyResourceDelegateContextKey<T> valueOf(String name, 
GenericType<T> genericType) {
+    return (JerseyResourceDelegateContextKey<T>) valueOf(name, 
genericType.getRawType());
+  }
+  
+  public Class<T> getType() {
+    return type;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    JerseyResourceDelegateContextKey other = 
(JerseyResourceDelegateContextKey) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    return true;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java
new file mode 100644
index 0000000..365f3b8
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/JerseyResourceDelegateUtil.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.logging.Log;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.ws.rs.ResourcesUtil;
+
+public class JerseyResourceDelegateUtil {
+  
+  public static final String ClientApplicationKey = "ClientApplication";
+  public static final String MasterContextKey = "MasterContextKey";
+  public static final String UriInfoKey = "UriInfoKey";
+
+  public static Response runJerseyResourceDelegate(JerseyResourceDelegate 
delegate, 
+      Application application,
+      JerseyResourceDelegateContext context,
+      Log log) {
+    Application localApp = ResourceConfigUtil.getJAXRSApplication(application);
+    
+    if ((localApp != null) && (localApp instanceof ClientApplication)) {
+      ClientApplication clientApplication = (ClientApplication) localApp;
+      JerseyResourceDelegateContextKey<ClientApplication> clientApplicationKey 
=
+          JerseyResourceDelegateContextKey.valueOf(ClientApplicationKey, 
ClientApplication.class);
+      context.put(clientApplicationKey, clientApplication);
+      
+      MasterContext masterContext = clientApplication.getMasterContext();
+      
+      if (masterContext != null) {
+        JerseyResourceDelegateContextKey<MasterContext> key = 
+            JerseyResourceDelegateContextKey.valueOf(MasterContextKey, 
MasterContext.class);
+        context.put(key, masterContext);
+      
+        return delegate.run(context);
+      } else {
+        return ResourcesUtil.createExceptionResponse(log, "MasterContext is 
null.");
+      }
+    } else {
+      return ResourcesUtil.createExceptionResponse(log, "Invalid injection on 
SessionsResource.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java
new file mode 100644
index 0000000..5794636
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/ResourcesUtil.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import org.apache.commons.logging.Log;
+import org.apache.tajo.ws.rs.responses.ExceptionResponse;
+
+import javax.ws.rs.core.Response;
+
+public class ResourcesUtil {
+
+  public static Response createExceptionResponse(Log log, String message) {
+    if (log != null) {
+      log.error(message);
+    }
+
+    ExceptionResponse response = new ExceptionResponse();
+    response.setMessage(message);
+    return 
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(response).build();
+  }
+  
+  public static Response createBadRequestResponse(Log log, String message) {
+    if (log != null) {
+      log.error(message);
+    }
+    
+    ExceptionResponse response = new ExceptionResponse();
+    response.setMessage(message);
+    return 
Response.status(Response.Status.BAD_REQUEST).entity(response).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
new file mode 100644
index 0000000..6da2b32
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/TajoRestService.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.json.FunctionAdapter;
+import org.apache.tajo.catalog.json.TableMetaAdapter;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.function.Function;
+import org.apache.tajo.json.ClassNameSerializer;
+import org.apache.tajo.json.DataTypeAdapter;
+import org.apache.tajo.json.DatumAdapter;
+import org.apache.tajo.json.GsonSerDerAdapter;
+import org.apache.tajo.json.PathSerializer;
+import org.apache.tajo.json.TimeZoneGsonSerdeAdapter;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.function.AggFunction;
+import org.apache.tajo.plan.function.GeneralFunction;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.serder.EvalNodeAdapter;
+import org.apache.tajo.plan.serder.LogicalNodeAdapter;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.ws.rs.netty.NettyRestServer;
+import org.apache.tajo.ws.rs.netty.NettyRestServerFactory;
+import org.apache.tajo.ws.rs.netty.gson.GsonFeature;
+import org.glassfish.jersey.filter.LoggingFilter;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.ServerProperties;
+
+import java.lang.reflect.Type;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.TimeZone;
+
+public class TajoRestService extends CompositeService {
+  
+  private final Log LOG = LogFactory.getLog(getClass());
+
+  private MasterContext masterContext;
+  private NettyRestServer restServer;
+
+  public TajoRestService(MasterContext masterContext) {
+    super(TajoRestService.class.getName());
+    
+    this.masterContext = masterContext;
+  }
+  
+  private Map<Type, GsonSerDerAdapter<?>> registerTypeAdapterMap() {
+    Map<Type, GsonSerDerAdapter<?>> adapters = TUtil.newHashMap();
+    adapters.put(Path.class, new PathSerializer());
+    adapters.put(Class.class, new ClassNameSerializer());
+    adapters.put(LogicalNode.class, new LogicalNodeAdapter());
+    adapters.put(EvalNode.class, new EvalNodeAdapter());
+    adapters.put(TableMeta.class, new TableMetaAdapter());
+    adapters.put(Function.class, new FunctionAdapter());
+    adapters.put(GeneralFunction.class, new FunctionAdapter());
+    adapters.put(AggFunction.class, new FunctionAdapter());
+    adapters.put(Datum.class, new DatumAdapter());
+    adapters.put(DataType.class, new DataTypeAdapter());
+    adapters.put(TimeZone.class, new TimeZoneGsonSerdeAdapter());
+
+    return adapters;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    GsonFeature gsonFeature = new GsonFeature(registerTypeAdapterMap());
+    
+    ClientApplication clientApplication = new ClientApplication(masterContext);
+    ResourceConfig resourceConfig = 
ResourceConfig.forApplication(clientApplication)
+        .register(gsonFeature)
+        .register(LoggingFilter.class)
+        .property(ServerProperties.FEATURE_AUTO_DISCOVERY_DISABLE, true)
+        .property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true);
+    TajoConf tajoConf = (TajoConf) conf;
+
+    int port = TajoConf.getIntVar(tajoConf, 
TajoConf.ConfVars.REST_SERVICE_PORT);
+    URI restServiceURI = new URI("http", null, "0.0.0.0", port, "/rest", null, 
null);
+    int workerCount = TajoConf.getIntVar(tajoConf, 
TajoConf.ConfVars.REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+    restServer = NettyRestServerFactory.createNettyRestServer(restServiceURI, 
resourceConfig, workerCount, false);
+
+    super.serviceInit(conf);
+    
+    LOG.info("Tajo Rest Service initialized.");
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    restServer.start();
+
+    super.serviceStart();
+    
+    LOG.info("Tajo Rest Service started.");
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+
+    if (restServer != null) {
+      restServer.shutdown();
+    }
+    
+    LOG.info("Tajo Rest Service stopped.");
+  }
+  
+  public InetSocketAddress getBindAddress() {
+    return restServer.getListenAddress();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java
new file mode 100644
index 0000000..85f68d5
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewDatabaseRequest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.requests;
+
+import com.google.gson.annotations.Expose;
+
+public class NewDatabaseRequest {
+  @Expose private String databaseName;
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java
new file mode 100644
index 0000000..8069179
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/NewSessionRequest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.requests;
+
+import com.google.gson.annotations.Expose;
+
+public class NewSessionRequest {
+
+  @Expose private String userName;
+  @Expose private String databaseName;
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void setUserName(String userName) {
+    this.userName = userName;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  @Override
+  public String toString() {
+    return "NewSessionRequest [userName=" + userName + ", databaseName=" + 
databaseName + "]";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java
new file mode 100644
index 0000000..0bf5678
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/requests/SubmitQueryRequest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.requests;
+
+import com.google.gson.annotations.Expose;
+
+public class SubmitQueryRequest {
+
+  @Expose private String query;
+  
+  public String getQuery() {
+    return query;
+  }
+  public void setQuery(String query) {
+    this.query = query;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
new file mode 100644
index 0000000..349fa4c
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/ClusterResource.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.ws.rs.JerseyResourceDelegate;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil;
+import org.apache.tajo.ws.rs.ResourcesUtil;
+import org.apache.tajo.ws.rs.responses.WorkerResponse;
+
+@Path("/cluster")
+public class ClusterResource {
+  
+  private static final Log LOG = LogFactory.getLog(ClusterResource.class);
+  
+  @Context
+  UriInfo uriInfo;
+  
+  @Context
+  Application application;
+  
+  JerseyResourceDelegateContext context;
+  
+  private static final String workersName = "workers";
+  
+  private void initializeContext() {
+    context = new JerseyResourceDelegateContext();
+    JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+        
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+    context.put(uriInfoKey, uriInfo);
+  }
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getClusterInfo() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a get cluster info request.");
+    }
+    
+    Response response = null;
+    
+    try {
+      initializeContext();
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetClusterInfoDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class GetClusterInfoDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      Map<Integer, Worker> workerMap = 
masterContext.getResourceManager().getWorkers();
+      List<WorkerResponse> workerList = new ArrayList<WorkerResponse>();
+      
+      for (Worker worker: workerMap.values()) {
+        workerList.add(new WorkerResponse(worker));
+      }
+      
+      Map<String, List<WorkerResponse>> workerResponseMap = new 
HashMap<String, List<WorkerResponse>>();
+      workerResponseMap.put(workersName, workerList);
+      
+      return Response.ok(workerResponseMap).build();
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
new file mode 100644
index 0000000..3868cab
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegate;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil;
+import org.apache.tajo.ws.rs.ResourcesUtil;
+import org.apache.tajo.ws.rs.requests.NewDatabaseRequest;
+import org.apache.tajo.ws.rs.responses.DatabaseInfoResponse;
+
+/**
+ * Deals with Database Management
+ */
+@Path("/databases")
+public class DatabasesResource {
+  
+  private static final Log LOG = LogFactory.getLog(DatabasesResource.class);
+  
+  @Context
+  UriInfo uriInfo;
+  
+  @Context
+  Application application;
+  
+  JerseyResourceDelegateContext context;
+  
+  private static final String databasesKeyName = "databases";
+  private static final String newDatabaseRequestKeyName = "NewDatabaseKey";
+  private static final String databaseNameKeyName = "databaseName";
+  
+  private void initializeContext() {
+    context = new JerseyResourceDelegateContext();
+    JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+        
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+    context.put(uriInfoKey, uriInfo);
+  }
+  
+  /**
+   * Get all databases from catalog server
+   * 
+   * @return
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getAllDatabases() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent retrieve all databases request.");
+    }
+    
+    Response response = null;
+    try {
+      initializeContext();
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetAllDatabasesDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class GetAllDatabasesDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      Collection<String> databaseNames = 
masterContext.getCatalog().getAllDatabaseNames();
+      Map<String, Collection<String>> databaseNamesMap = new HashMap<String, 
Collection<String>>();
+      databaseNamesMap.put(databasesKeyName, databaseNames);
+      return Response.ok(databaseNamesMap).build();
+    }
+  }
+  
+  /**
+   * 
+   * @param request
+   * @return
+   */
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response createNewDatabase(NewDatabaseRequest request) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a new database creation request");
+    }
+    
+    Response response = null;
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<NewDatabaseRequest> 
newDatabaseRequestKey =
+          JerseyResourceDelegateContextKey.valueOf(newDatabaseRequestKeyName, 
NewDatabaseRequest.class);
+      context.put(newDatabaseRequestKey, request);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new CreateNewDatabaseDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class CreateNewDatabaseDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<NewDatabaseRequest> 
newDatabaseRequestKey =
+          JerseyResourceDelegateContextKey.valueOf(newDatabaseRequestKeyName, 
NewDatabaseRequest.class);
+      NewDatabaseRequest request = context.get(newDatabaseRequestKey);
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+      UriInfo uriInfo = context.get(uriInfoKey);
+      
+      if (request.getDatabaseName() == null || 
request.getDatabaseName().isEmpty()) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "databaseName is 
null or empty.");
+      }
+      
+      boolean databaseCreated =
+          masterContext.getCatalog().createDatabase(request.getDatabaseName(), 
+              TajoConstants.DEFAULT_TABLESPACE_NAME);
+      
+      if (databaseCreated) {
+        URI newDatabaseURI = uriInfo.getBaseUriBuilder()
+            .path(DatabasesResource.class)
+            .path(DatabasesResource.class, "getDatabase")
+            .build(request.getDatabaseName());
+        return Response.created(newDatabaseURI).build();
+      } else {
+        return ResourcesUtil.createExceptionResponse(LOG, "Failed to create a 
new database.");
+      }
+    }
+  }
+  
+  /**
+   * 
+   * @param databaseName
+   * @return
+   */
+  @GET
+  @Path("/{databaseName}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getDatabase(@PathParam("databaseName") String databaseName) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a getDatabase request.");
+    }
+    
+    Response response = null;
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      context.put(databaseNameKey, databaseName);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetDatabaseDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class GetDatabaseDelegate implements JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      String databaseName = context.get(databaseNameKey);
+      
+      if (databaseName.isEmpty()) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "DatabaseName is 
empty string.");
+      }
+      
+      CatalogService catalogService = masterContext.getCatalog();
+      List<DatabaseProto> databasesList = catalogService.getAllDatabases();
+      DatabaseProto selectedDatabase = null;
+      for (DatabaseProto database: databasesList) {
+        if (database.getName().equals(databaseName)) {
+          selectedDatabase = database;
+          break;
+        }
+      }
+      
+      if (selectedDatabase != null) {
+        List<TablespaceProto> tablespacesList = 
catalogService.getAllTablespaces();
+        TablespaceProto selectedTablespace = null;
+        
+        for (TablespaceProto tablespace: tablespacesList) {
+          if (tablespace.hasId() && tablespace.getId() == 
selectedDatabase.getSpaceId()) {
+            selectedTablespace = tablespace;
+            break;
+          }
+        }
+        
+        DatabaseInfoResponse databaseInfo = new DatabaseInfoResponse();
+        databaseInfo.setId(selectedDatabase.getId());
+        databaseInfo.setName(selectedDatabase.getName());
+        databaseInfo.setTablespace(selectedTablespace.getUri());
+        return Response.ok(databaseInfo).build();
+      } else {
+        return Response.status(Response.Status.NOT_FOUND).build();
+      }
+    }
+  }
+  
+  /**
+   * 
+   * @param databaseName
+   * @return
+   */
+  @DELETE
+  @Path("/{databaseName}")
+  public Response deleteDatabase(@PathParam("databaseName") String 
databaseName) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a delete database request.");
+    }
+    
+    Response response = null;
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      context.put(databaseNameKey, databaseName);
+      
+      return JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new DeleteDatabaseDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class DeleteDatabaseDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      String databaseName = context.get(databaseNameKey);
+      
+      if (databaseName.isEmpty()) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "DatabaseName is 
empty string.");
+      }
+      
+      CatalogService catalogService = masterContext.getCatalog();
+      
+      if (!catalogService.existDatabase(databaseName)) {
+        return Response.status(Status.NOT_FOUND).build();
+      }
+      
+      boolean databaseDropped = catalogService.dropDatabase(databaseName);
+      
+      if (databaseDropped) {
+        return Response.ok().build();
+      } else {
+        return ResourcesUtil.createExceptionResponse(LOG, "Unable to drop a 
database " + databaseName);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java
 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java
new file mode 100644
index 0000000..9545b1a
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/FunctionsResource.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegate;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil;
+import org.apache.tajo.ws.rs.ResourcesUtil;
+
+@Path("/databases/{databaseName}/functions")
+public class FunctionsResource {
+  
+  private static final Log LOG = LogFactory.getLog(TablesResource.class);
+
+  @Context
+  UriInfo uriInfo;
+  
+  @Context
+  Application application;
+  
+  @PathParam("databaseName")
+  String databaseName;
+  
+  JerseyResourceDelegateContext context;
+  
+  private static final String databaseNameKeyName = "databaseName";
+  
+  private void initializeContext() {
+    context = new JerseyResourceDelegateContext();
+    JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+        
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+    context.put(uriInfoKey, uriInfo);
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getAllFunctions() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a retrieve all functions request.");
+    }
+    
+    Response response = null;
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      context.put(databaseNameKey, databaseName);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetAllFunctionsDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    return response;
+  }
+  
+  private static class GetAllFunctionsDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      Collection<FunctionDesc> functionDescriptors = 
masterContext.getCatalog().getFunctions();
+      if (functionDescriptors.size() > 0) {
+        List<FunctionSignature> functionSignature = 
+            new ArrayList<FunctionSignature>(functionDescriptors.size());
+        for (FunctionDesc functionDesc : functionDescriptors) {
+          functionSignature.add(functionDesc.getSignature());
+        }
+        
+        return Response.ok(functionSignature).build();
+      } else {
+        return Response.status(Status.NOT_FOUND).build();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32b524d7/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java 
b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
new file mode 100644
index 0000000..99609d7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.ws.rs.resources;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.QueryManager;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.InvalidSessionException;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.ws.rs.JerseyResourceDelegate;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
+import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil;
+import org.apache.tajo.ws.rs.ResourcesUtil;
+import org.apache.tajo.ws.rs.requests.SubmitQueryRequest;
+
+@Path("/databases/{databaseName}/queries")
+public class QueryResource {
+
+  private static final Log LOG = LogFactory.getLog(QueryResource.class);
+  
+  @Context
+  UriInfo uriInfo;
+  
+  @Context
+  Application application;
+  
+  @PathParam("databaseName")
+  String databaseName;
+  
+  JerseyResourceDelegateContext context;
+  
+  protected static final String tajoSessionIdHeaderName = "X-Tajo-Session";
+  
+  private static final String databaseNameKeyName = "databaseName";
+  private static final String stateKeyName = "state";
+  private static final String startTimeKeyName = "startTime";
+  private static final String endTimeKeyName = "endTime";
+  private static final String sessionIdKeyName = "sessionId";
+  private static final String submitQueryRequestKeyName = "submitQueryRequest";
+  private static final String printTypeKeyName = "printType";
+  private static final String queryIdKeyName = "queryId";
+  
+  private void initializeContext() {
+    context = new JerseyResourceDelegateContext();
+    JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+        
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+    context.put(uriInfoKey, uriInfo);
+    JerseyResourceDelegateContextKey<String> databaseNameKey =
+        JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+    context.put(databaseNameKey, databaseName);
+  }
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getAllQueries(@QueryParam("state") String state,
+      @QueryParam("startTime") long startTime,
+      @QueryParam("endTime") long endTime) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a get all queries request.");
+    }
+    
+    Response response = null;
+    
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> stateKey =
+          JerseyResourceDelegateContextKey.valueOf(stateKeyName, String.class);
+      if (state != null && !state.isEmpty()) {
+        context.put(stateKey, state);
+      }
+      JerseyResourceDelegateContextKey<Long> startTimeKey =
+          JerseyResourceDelegateContextKey.valueOf(startTimeKeyName, 
Long.class);
+      if (startTime > 0) {
+        context.put(startTimeKey, startTime);
+      }
+      JerseyResourceDelegateContextKey<Long> endTimeKey =
+          JerseyResourceDelegateContextKey.valueOf(endTimeKeyName, Long.class);
+      if (endTime > 0) {
+        context.put(endTimeKey, endTime);
+      }
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetAllQueriesDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class GetAllQueriesDelegate implements JerseyResourceDelegate 
{
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<String> stateKey =
+          JerseyResourceDelegateContextKey.valueOf(stateKeyName, String.class);
+      String state = context.get(stateKey);
+      JerseyResourceDelegateContextKey<Long> startTimeKey =
+          JerseyResourceDelegateContextKey.valueOf(startTimeKeyName, 
Long.class);
+      long startTime = context.get(startTimeKey);
+      JerseyResourceDelegateContextKey<Long> endTimeKey =
+          JerseyResourceDelegateContextKey.valueOf(endTimeKeyName, Long.class);
+      long endTime = context.get(endTimeKey);
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      TajoProtos.QueryState queryState = null;
+      try {
+        if (state != null && !state.isEmpty()) {
+          queryState = TajoProtos.QueryState.valueOf(state);
+        }
+      } catch (Exception e) {
+        return ResourcesUtil.createBadRequestResponse(LOG, state + " is not a 
valid query state.");
+      }
+      
+      Map<String, List<QueryInfo>> queriesMap = new HashMap<String, 
List<QueryInfo>>();
+      List<QueryInfo> queriesInfo = new ArrayList<QueryInfo>();
+      
+      QueryManager queryManager = masterContext.getQueryJobManager();
+      for (QueryInProgress queryInProgress: 
queryManager.getSubmittedQueries()) {
+        queriesInfo.add(queryInProgress.getQueryInfo());
+      }
+      
+      for (QueryInProgress queryInProgress: queryManager.getRunningQueries()) {
+        queriesInfo.add(queryInProgress.getQueryInfo());
+      }
+      
+      try {
+        queriesInfo.addAll(masterContext.getHistoryReader().getQueries(null));
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        return ResourcesUtil.createExceptionResponse(LOG, e.getMessage());
+      }
+      
+      if (state != null) {
+        queriesInfo = selectQueriesInfoByState(queriesInfo, queryState);
+      }
+      
+      if (startTime > 0 || endTime > 0) {
+        queriesInfo = selectQueriesInfoByTime(queriesInfo, startTime, endTime);
+      }
+      queriesMap.put("queries", queriesInfo);
+      
+      return Response.ok(queriesMap).build();
+    }
+    
+    private List<QueryInfo> selectQueriesInfoByState(List<QueryInfo> 
queriesInfo, TajoProtos.QueryState state) {
+      List<QueryInfo> resultQueriesInfo = new 
ArrayList<QueryInfo>(queriesInfo.size()/2);
+      
+      for (QueryInfo queryInfo: queriesInfo) {
+        if (state.equals(queryInfo.getQueryState())) {
+          resultQueriesInfo.add(queryInfo);
+        }
+      }
+      
+      return resultQueriesInfo;
+    }
+    
+    private List<QueryInfo> selectQueriesInfoByTime(List<QueryInfo> 
queriesInfo, long startTime, long endTime) {
+      List<QueryInfo> resultQueriesInfo = new 
ArrayList<QueryInfo>(queriesInfo.size()/2);
+      
+      for (QueryInfo queryInfo: queriesInfo) {
+        if (queryInfo.getStartTime() > startTime) {
+          resultQueriesInfo.add(queryInfo);
+        }
+        if (queryInfo.getStartTime() < endTime) {
+          resultQueriesInfo.add(queryInfo);
+        }
+      }
+      
+      return resultQueriesInfo;
+    }
+  }
+  
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response submitQuery(@HeaderParam(tajoSessionIdHeaderName) String 
sessionId,
+      SubmitQueryRequest request) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a submit query request.");
+    }
+    
+    Response response = null;
+    
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> sessionIdKey =
+          JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, 
String.class);
+      context.put(sessionIdKey, sessionId);
+      JerseyResourceDelegateContextKey<SubmitQueryRequest> 
submitQueryRequestKey =
+          JerseyResourceDelegateContextKey.valueOf(submitQueryRequestKeyName, 
SubmitQueryRequest.class);
+      context.put(submitQueryRequestKey, request);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new SubmitQueryDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class SubmitQueryDelegate implements JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<String> sessionIdKey =
+          JerseyResourceDelegateContextKey.valueOf(sessionIdKeyName, 
String.class);
+      String sessionId = context.get(sessionIdKey);
+      JerseyResourceDelegateContextKey<SubmitQueryRequest> 
submitQueryRequestKey =
+          JerseyResourceDelegateContextKey.valueOf(submitQueryRequestKeyName, 
SubmitQueryRequest.class);
+      SubmitQueryRequest request = context.get(submitQueryRequestKey);
+      JerseyResourceDelegateContextKey<String> databaseNameKey =
+          JerseyResourceDelegateContextKey.valueOf(databaseNameKeyName, 
String.class);
+      String databaseName = context.get(databaseNameKey);
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      if (sessionId == null || sessionId.isEmpty()) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "Session Id is null 
or empty string.");
+      }
+      if (request == null || request.getQuery() == null || 
request.getQuery().isEmpty()) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "query is null or 
emptry string.");
+      }
+      
+      Session session;
+      try {
+        session = masterContext.getSessionManager().getSession(sessionId);
+      } catch (InvalidSessionException e) {
+        return ResourcesUtil.createBadRequestResponse(LOG, "Provided session 
id (" + sessionId + ") is invalid.");
+      }
+      
+      SubmitQueryResponse response = 
+          masterContext.getGlobalEngine().executeQuery(session, 
request.getQuery(), false);
+      if (response.hasResultCode() && 
ClientProtos.ResultCode.ERROR.equals(response.getResultCode())) {
+        return ResourcesUtil.createExceptionResponse(LOG, 
response.getErrorMessage());
+      } else {
+        JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
+            
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.UriInfoKey, 
UriInfo.class);
+        UriInfo uriInfo = context.get(uriInfoKey);
+        
+        URI queryURI = uriInfo.getBaseUriBuilder()
+            .path(QueryResource.class)
+            .path(QueryResource.class, "getQuery")
+            .build(databaseName, new 
QueryId(response.getQueryId()).toString());
+        return Response.created(queryURI).build();
+      }
+    }
+  }
+  
+  @GET
+  @Path("{queryId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getQuery(@PathParam("queryId") String queryId, 
@QueryParam("print") String printType) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a get query request.");
+    }
+    
+    Response response = null;
+    
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> queryIdKey =
+          JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, 
String.class);
+      context.put(queryIdKey, queryId);
+      JerseyResourceDelegateContextKey<String> printTypeKey =
+          JerseyResourceDelegateContextKey.valueOf(printTypeKeyName, 
String.class);
+      context.put(printTypeKey, printType);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new GetQueryDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class GetQueryDelegate implements JerseyResourceDelegate {
+    
+    private static final String briefPrint = "BRIEF";
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<String> queryIdKey =
+          JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, 
String.class);
+      String queryId = context.get(queryIdKey);
+      JerseyResourceDelegateContextKey<String> printTypeKey =
+          JerseyResourceDelegateContextKey.valueOf(printTypeKeyName, 
String.class);
+      String printType = context.get(printTypeKey);
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId);
+      
+      QueryManager queryManager = masterContext.getQueryJobManager();
+      QueryInProgress queryInProgress = 
queryManager.getQueryInProgress(queryIdObj);
+      
+      QueryInfo queryInfo = null;
+      if (queryInProgress == null) {
+        queryInfo = queryManager.getFinishedQuery(queryIdObj);
+      } else {
+        queryInfo = queryInProgress.getQueryInfo();
+      }
+      
+      if (queryInfo != null) {
+        if (briefPrint.equalsIgnoreCase(printType)) {
+          queryInfo = getBriefQueryInfo(queryInfo);
+        }
+        return Response.ok(queryInfo).build();
+      } else {
+        return Response.status(Status.NOT_FOUND).build();
+      }
+    }
+    
+    private QueryInfo getBriefQueryInfo(QueryInfo queryInfo) {
+      QueryInfo newQueryInfo = new QueryInfo(queryInfo.getQueryId(), null, 
null, null);
+      newQueryInfo.setQueryState(queryInfo.getQueryState());
+      newQueryInfo.setStartTime(queryInfo.getStartTime());
+      return newQueryInfo;
+    }
+  }
+  
+  @DELETE
+  @Path("{queryId}")
+  public Response terminateQuery(@PathParam("queryId") String queryId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Client sent a terminate query request.");
+    }
+    
+    Response response = null;
+    
+    try {
+      initializeContext();
+      JerseyResourceDelegateContextKey<String> queryIdKey =
+          JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, 
String.class);
+      context.put(queryIdKey, queryId);
+      
+      response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
+          new TerminateQueryDelegate(),
+          application,
+          context,
+          LOG);
+    } catch (Throwable e) {
+      LOG.error(e.getMessage(), e);
+      
+      response = ResourcesUtil.createExceptionResponse(null, e.getMessage());
+    }
+    
+    return response;
+  }
+  
+  private static class TerminateQueryDelegate implements 
JerseyResourceDelegate {
+
+    @Override
+    public Response run(JerseyResourceDelegateContext context) {
+      JerseyResourceDelegateContextKey<String> queryIdKey =
+          JerseyResourceDelegateContextKey.valueOf(queryIdKeyName, 
String.class);
+      String queryId = context.get(queryIdKey);
+      JerseyResourceDelegateContextKey<MasterContext> masterContextKey =
+          
JerseyResourceDelegateContextKey.valueOf(JerseyResourceDelegateUtil.MasterContextKey,
 MasterContext.class);
+      MasterContext masterContext = context.get(masterContextKey);
+      
+      QueryId queryIdObj = TajoIdUtils.parseQueryId(queryId);
+      
+      QueryManager queryManager = masterContext.getQueryJobManager();
+      queryManager.getEventHandler().handle(new 
QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+          new QueryInfo(queryIdObj)));
+      return Response.ok().build();
+    }
+  }
+  
+  @Path("/{queryId}/result")
+  public QueryResultResource getQueryResult(@PathParam("queryId") String 
queryId) {
+    QueryResultResource queryResultResource = new QueryResultResource();
+    queryResultResource.setUriInfo(uriInfo);
+    queryResultResource.setApplication(application);
+    queryResultResource.setDatabaseName(databaseName);
+    queryResultResource.setQueryId(queryId);
+    return queryResultResource;
+  }
+}

Reply via email to