http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-jdbc/pom.xml 
b/tajo-storage/tajo-storage-jdbc/pom.xml
new file mode 100644
index 0000000..35000a4
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/pom.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-jdbc</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo JDBC storage common</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/*.sql</exclude>
+          </excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m 
-Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                
<argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                
<argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/JdbcFragmentProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
new file mode 100644
index 0000000..2910ea5
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/ConnectionInfo.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import org.apache.tajo.exception.TajoInternalError;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConnectionInfo {
+  String scheme;
+  String host;
+  String dbName;
+  String tableName;
+  String user;
+  String password;
+  Map<String, String> params;
+
+  public String scheme() {
+    return scheme;
+  }
+
+  public String database() {
+    return dbName;
+  }
+
+  public String table() {
+    return tableName;
+  }
+
+  public String user() {
+    return user;
+  }
+
+  public String password() {
+    return password;
+  }
+
+  public static ConnectionInfo fromURI(String originalUri) {
+    return fromURI(URI.create(originalUri));
+  }
+
+  public static ConnectionInfo fromURI(URI originalUri) {
+    final String uriStr = originalUri.toASCIIString();
+    URI uri = originalUri;
+
+    final ConnectionInfo connInfo = new ConnectionInfo();
+    connInfo.scheme = uriStr.substring(0, uriStr.indexOf("://"));
+
+    if (connInfo.scheme.split(":").length > 1) {
+      int idx = uriStr.indexOf(':');
+      uri = URI.create(uriStr.substring(idx + 1));
+    }
+
+    connInfo.host = uri.getHost();
+
+    String path = uri.getPath();
+    if (path != null && !path.isEmpty()) {
+      String [] pathElements = path.substring(1).split("/");
+      if (pathElements.length != 1) {
+        throw new TajoInternalError("Invalid JDBC path: " + path);
+      }
+      connInfo.dbName = pathElements[0];
+    }
+
+    Map<String, String> params = new HashMap<>();
+
+    int paramIndex = uriStr.indexOf("?");
+    if (paramIndex > 0) {
+      String parameterPart = uriStr.substring(paramIndex+1, uriStr.length());
+
+      String [] eachParam = parameterPart.split("&");
+
+      for (String each: eachParam) {
+        String [] keyValues = each.split("=");
+        if (keyValues.length != 2) {
+          throw new TajoInternalError("Invalid URI Parameters: " + 
parameterPart);
+        }
+        params.put(keyValues[0], keyValues[1]);
+      }
+    }
+
+    if (params.containsKey("table")) {
+      connInfo.tableName = params.remove("table");
+    }
+
+    if (params.containsKey("user")) {
+      connInfo.user = params.remove("user");
+    }
+    if (params.containsKey("password")) {
+      connInfo.password = params.remove("password");
+    }
+
+    connInfo.params = params;
+
+    return connInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
new file mode 100644
index 0000000..bf9536e
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto;
+import org.apache.tajo.util.TUtil;
+
+public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, 
Cloneable {
+  String uri;
+  String inputSourceId;
+  String [] hostNames;
+
+
+  public JdbcFragment(ByteString raw) throws InvalidProtocolBufferException {
+    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
+    builder.mergeFrom(raw);
+    builder.build();
+    init(builder.build());
+  }
+
+  public JdbcFragment(String inputSourceId, String uri) {
+    this.inputSourceId = inputSourceId;
+    this.uri = uri;
+    this.hostNames = extractHosts(uri);
+  }
+
+  private void init(JdbcFragmentProto proto) {
+    this.uri = proto.getUri();
+    this.inputSourceId = proto.getInputSourceId();
+    this.hostNames = proto.getHostsList().toArray(new String 
[proto.getHostsCount()]);
+  }
+
+  private String [] extractHosts(String uri) {
+    return new String[] {ConnectionInfo.fromURI(uri).host};
+  }
+
+  @Override
+  public String getTableName() {
+    return inputSourceId;
+  }
+
+  public String getUri() {
+    return uri;
+  }
+
+  @Override
+  public long getLength() {
+    return 0;
+  }
+
+  @Override
+  public String getKey() {
+    return null;
+  }
+
+  @Override
+  public String[] getHosts() {
+    return hostNames;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return false;
+  }
+
+  @Override
+  public CatalogProtos.FragmentProto getProto() {
+    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
+    builder.setInputSourceId(this.inputSourceId);
+    builder.setUri(this.uri);
+    if(hostNames != null) {
+      builder.addAllHosts(TUtil.newList(hostNames));
+    }
+
+    CatalogProtos.FragmentProto.Builder fragmentBuilder = 
CatalogProtos.FragmentProto.newBuilder();
+    fragmentBuilder.setId(this.inputSourceId);
+    fragmentBuilder.setStoreType("JDBC");
+    fragmentBuilder.setContents(builder.buildPartial().toByteString());
+    return fragmentBuilder.build();
+  }
+
+  @Override
+  public int compareTo(JdbcFragment o) {
+    return this.uri.compareTo(o.uri);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
new file mode 100644
index 0000000..eff1b9c
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcMetadataProviderBase.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.*;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.sql.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.tajo.catalog.CatalogUtil.newSimpleDataType;
+
+public abstract class JdbcMetadataProviderBase implements MetadataProvider {
+  protected static final Log LOG = 
LogFactory.getLog(JdbcMetadataProviderBase.class);
+
+  public static String [] GENERAL_TABLE_TYPES = new String [] {"TABLE"};
+
+  protected final JdbcTablespace space;
+  protected final String databaseName;
+
+  protected final String jdbcUri;
+
+  protected final Connection connection;
+
+  public JdbcMetadataProviderBase(JdbcTablespace space, String dbName) {
+    this.space = space;
+    this.databaseName = dbName;
+
+    ConnectionInfo connInfo = ConnectionInfo.fromURI(space.getUri());
+    this.jdbcUri  = space.getUri().toASCIIString();
+
+    try {
+      Class.forName(getJdbcDriverName()).newInstance();
+      LOG.info(getJdbcDriverName() + " is loaded...");
+    } catch (Exception e) {
+      throw new TajoInternalError(e);
+    }
+
+    try {
+      connection = DriverManager.getConnection(jdbcUri, space.connProperties);
+    } catch (SQLException e) {
+      throw new TajoInternalError(e);
+    }
+  }
+
+  @Override
+  public String getTablespaceName() {
+    return space.getName();
+  }
+
+  @Override
+  public URI getTablespaceUri() {
+    return space.getUri();
+  }
+
+  @Override
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  @Override
+  public Collection<String> getSchemas() {
+    return Collections.EMPTY_SET;
+  }
+
+  @Override
+  public Collection<String> getTables(@Nullable String schemaPattern, 
@Nullable String tablePattern) {
+    ResultSet res = null;
+    List<String> tableNames = Lists.newArrayList();
+    try {
+      res = connection.getMetaData().getTables(databaseName, schemaPattern, 
tablePattern, GENERAL_TABLE_TYPES);
+      while(res.next()) {
+        tableNames.add(res.getString("TABLE_NAME"));
+      }
+    } catch (SQLException e) {
+      throw new TajoInternalError(e);
+    } finally {
+      try {
+        if (res != null) {
+          res.close();
+        }
+      } catch (SQLException e) {
+        LOG.warn(e);
+      }
+    }
+
+    return tableNames;
+  }
+
+  private TypeDesc convertDataType(ResultSet res) throws SQLException {
+    final int typeId = res.getInt("DATA_TYPE");
+
+    switch (typeId ) {
+    case Types.BOOLEAN:
+      return new TypeDesc(newSimpleDataType(Type.BOOLEAN));
+
+    case Types.TINYINT:
+    case Types.SMALLINT:
+    case Types.INTEGER:
+      return new TypeDesc(newSimpleDataType(Type.INT4));
+
+    case Types.DISTINCT: // sequence for postgresql
+    case Types.BIGINT:
+      return new TypeDesc(newSimpleDataType(Type.INT8));
+
+    case Types.FLOAT:
+      return new TypeDesc(newSimpleDataType(Type.FLOAT4));
+
+    case Types.NUMERIC:
+    case Types.DECIMAL:
+    case Types.DOUBLE:
+      return new TypeDesc(newSimpleDataType(Type.FLOAT8));
+
+    case Types.DATE:
+      return new TypeDesc(newSimpleDataType(Type.DATE));
+
+    case Types.TIME:
+      return new TypeDesc(newSimpleDataType(Type.TIME));
+
+    case Types.TIMESTAMP:
+      return new TypeDesc(newSimpleDataType(Type.TIMESTAMP));
+
+    case Types.CHAR:
+    case Types.NCHAR:
+    case Types.VARCHAR:
+    case Types.NVARCHAR:
+    case Types.CLOB:
+    case Types.NCLOB:
+    case Types.LONGVARCHAR:
+    case Types.LONGNVARCHAR:
+      return new TypeDesc(newSimpleDataType(Type.TEXT));
+
+    case Types.BINARY:
+    case Types.VARBINARY:
+    case Types.BLOB:
+      return new TypeDesc(newSimpleDataType(Type.BLOB));
+
+    default:
+      throw SQLExceptionUtil.toSQLException(new 
UnsupportedDataTypeException(typeId + ""));
+    }
+  }
+
+  @Override
+  public TableDesc getTableDesc(String schemaName, String tableName) throws 
UndefinedTablespaceException {
+    ResultSet resultForTable = null;
+    ResultSet resultForColumns = null;
+    try {
+
+      // get table name
+      resultForTable = connection.getMetaData().getTables(databaseName, 
schemaName, tableName, null);
+
+      if (!resultForTable.next()) {
+        throw new UndefinedTablespaceException(tableName);
+      }
+      final String name = resultForTable.getString("TABLE_NAME");
+
+      // get columns
+      resultForColumns = connection.getMetaData().getColumns(databaseName, 
schemaName, tableName, null);
+
+      final List<Pair<Integer, Column>> columns = Lists.newArrayList();
+
+      while(resultForColumns.next()) {
+        final int ordinalPos = resultForColumns.getInt("ORDINAL_POSITION");
+        final String qualifier = resultForColumns.getString("TABLE_NAME");
+        final String columnName = resultForColumns.getString("COLUMN_NAME");
+        final TypeDesc type = convertDataType(resultForColumns);
+        final Column c = new Column(CatalogUtil.buildFQName(databaseName, 
qualifier, columnName), type);
+
+        columns.add(new Pair<>(ordinalPos, c));
+      }
+
+      // sort columns in an order of ordinal position
+      Collections.sort(columns, new Comparator<Pair<Integer, Column>>() {
+        @Override
+        public int compare(Pair<Integer, Column> o1, Pair<Integer, Column> o2) 
{
+          return o1.getFirst() - o2.getFirst();
+        }
+      });
+
+      // transform the pair list into collection for columns
+      final Schema schema = new Schema(Collections2.transform(columns, new 
Function<Pair<Integer,Column>, Column>() {
+        @Override
+        public Column apply(@Nullable Pair<Integer, Column> columnPair) {
+          return columnPair.getSecond();
+        }
+      }));
+
+
+      // fill the table stats
+      final TableStats stats = new TableStats();
+      stats.setNumRows(-1); // unknown
+
+      final TableDesc table = new TableDesc(
+          CatalogUtil.buildFQName(databaseName, name),
+          schema,
+          new TableMeta("rowstore", new KeyValueSet()),
+          space.getTableUri(databaseName, name)
+      );
+      table.setStats(stats);
+
+      return table;
+
+    } catch (SQLException e) {
+      throw new TajoInternalError(e);
+    } finally {
+      try {
+        if (resultForTable != null) {
+          resultForTable.close();
+        }
+
+        if (resultForColumns != null) {
+          resultForColumns.close();
+        }
+
+      } catch (SQLException e) {
+        LOG.warn(e);
+      }
+    }
+  }
+
+  protected abstract String getJdbcDriverName();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
new file mode 100644
index 0000000..82c3be3
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.TimeDatum;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.*;
+import java.util.Iterator;
+import java.util.Properties;
+
+public abstract class JdbcScanner implements Scanner {
+  private static final Log LOG = LogFactory.getLog(JdbcScanner.class);
+
+  protected final DatabaseMetaData dbMetaData;
+  /** JDBC Connection Properties */
+  protected final Properties connProperties;
+  protected final String tableName;
+  protected final Schema schema;
+  protected final TableMeta tableMeta;
+  protected final JdbcFragment fragment;
+  protected final TableStats stats;
+  protected final SQLBuilder builder;
+
+  protected Column [] targets;
+  protected EvalNode filter;
+  protected Long limit;
+  protected LogicalNode planPart;
+  protected VTuple outTuple;
+  protected String generatedSql;
+  protected ResultSetIterator iter;
+
+  protected int recordCount = 0;
+
+  /**
+   *
+   * @param dbMetaData     DatabaseMetaData
+   * @param connProperties JDBC Connection Properties
+   * @param tableSchema    Table Schema
+   * @param tableMeta      Table Properties
+   * @param fragment       Fragment
+   */
+  public JdbcScanner(final DatabaseMetaData dbMetaData,
+                     final Properties connProperties,
+                     final Schema tableSchema,
+                     final TableMeta tableMeta,
+                     final JdbcFragment fragment) {
+
+    Preconditions.checkNotNull(dbMetaData);
+    Preconditions.checkNotNull(connProperties);
+    Preconditions.checkNotNull(tableSchema);
+    Preconditions.checkNotNull(tableMeta);
+    Preconditions.checkNotNull(fragment);
+
+    this.dbMetaData = dbMetaData;
+    this.connProperties = connProperties;
+    this.tableName = ConnectionInfo.fromURI(fragment.getUri()).tableName;
+    this.schema = tableSchema;
+    this.tableMeta = tableMeta;
+    this.fragment = fragment;
+    this.stats = new TableStats();
+    builder = getSQLBuilder();
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+    outTuple = new VTuple(targets.length);
+
+    if (planPart == null) {
+      generatedSql = builder.build(tableName, targets, filter, limit);
+    } else {
+      generatedSql = builder.build(planPart);
+    }
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (iter == null) {
+      iter = executeQueryAndGetIter();
+    }
+
+    if (iter.hasNext()) {
+      return iter.next();
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (iter != null) {
+      iter.rewind();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (iter != null) {
+      iter.close();
+    }
+  }
+
+  @Override
+  public void pushOperators(LogicalNode planPart) {
+    this.planPart = planPart;
+  }
+
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public void setTarget(Column [] targets) {
+    this.targets = targets;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return true;
+  }
+
+  @Override
+  public void setFilter(EvalNode filter) {
+    this.filter = filter;
+  }
+
+  @Override
+  public void setLimit(long num) {
+    this.limit = num;
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return 0;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return stats;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  protected SQLBuilder getSQLBuilder() {
+    return new SQLBuilder(dbMetaData, getSQLExprBuilder());
+  }
+  protected SQLExpressionGenerator getSQLExprBuilder() {
+    return new SQLExpressionGenerator(dbMetaData);
+  }
+
+  protected void convertTuple(ResultSet resultSet, VTuple tuple) {
+    try {
+      for (int column_idx = 0; column_idx < targets.length; column_idx++) {
+        final Column c = targets[column_idx];
+        final int resultIdx = column_idx + 1;
+
+        switch (c.getDataType().getType()) {
+        case INT1:
+        case INT2:
+          tuple.put(column_idx, 
DatumFactory.createInt2(resultSet.getShort(resultIdx)));
+          break;
+        case INT4:
+          tuple.put(column_idx, 
DatumFactory.createInt4(resultSet.getInt(resultIdx)));
+          break;
+        case INT8:
+          tuple.put(column_idx, 
DatumFactory.createInt8(resultSet.getLong(resultIdx)));
+          break;
+        case FLOAT4:
+          tuple.put(column_idx, 
DatumFactory.createFloat4(resultSet.getFloat(resultIdx)));
+          break;
+        case FLOAT8:
+          tuple.put(column_idx, 
DatumFactory.createFloat8(resultSet.getDouble(resultIdx)));
+          break;
+        case CHAR:
+          tuple.put(column_idx, 
DatumFactory.createText(resultSet.getString(resultIdx)));
+          break;
+        case VARCHAR:
+        case TEXT:
+          // TODO - trim is unnecessary in many cases, so we can use it for 
certain cases
+          tuple.put(column_idx, 
DatumFactory.createText(resultSet.getString(resultIdx).trim()));
+          break;
+        case DATE:
+          final Date date = resultSet.getDate(resultIdx);
+          tuple.put(column_idx, DatumFactory.createDate(1900 + date.getYear(), 
1 + date.getMonth(), date.getDate()));
+          break;
+        case TIME:
+          tuple.put(column_idx, new 
TimeDatum(resultSet.getTime(resultIdx).getTime() * 1000));
+          break;
+        case TIMESTAMP:
+          tuple.put(column_idx,
+              
DatumFactory.createTimestmpDatumWithJavaMillis(resultSet.getTimestamp(resultIdx).getTime()));
+          break;
+        case BINARY:
+        case VARBINARY:
+        case BLOB:
+          tuple.put(column_idx,
+              DatumFactory.createBlob(resultSet.getBytes(resultIdx)));
+          break;
+        default:
+          throw new TajoInternalError(new 
UnsupportedDataTypeException(c.getDataType().getType().name()));
+        }
+      }
+    } catch (SQLException s) {
+      throw new TajoInternalError(s);
+    }
+  }
+
+  private ResultSetIterator executeQueryAndGetIter() {
+    try {
+      LOG.info("Generated SQL: " + generatedSql);
+      Connection conn = DriverManager.getConnection(fragment.uri, 
connProperties);
+      Statement statement = conn.createStatement();
+      ResultSet resultset = statement.executeQuery(generatedSql);
+      return new ResultSetIterator((resultset));
+    } catch (SQLException s) {
+      throw new TajoInternalError(s);
+    }
+  }
+
+  public class ResultSetIterator implements Iterator<Tuple>, Closeable {
+
+    private final ResultSet resultSet;
+
+    private boolean didNext = false;
+    private boolean hasNext = false;
+
+    public ResultSetIterator(ResultSet resultSet) {
+      this.resultSet = resultSet;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!didNext) {
+
+        try {
+          hasNext = resultSet.next();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+
+        didNext = true;
+      }
+      return hasNext;
+    }
+
+    @Override
+    public Tuple next() {
+      if (!didNext) {
+        try {
+          resultSet.next();
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      didNext = false;
+      convertTuple(resultSet, outTuple);
+      recordCount++;
+      return outTuple;
+    }
+
+    @Override
+    public void remove() {
+      throw new TajoRuntimeException(new UnsupportedException());
+    }
+
+    public void rewind() {
+      try {
+        resultSet.isBeforeFirst();
+      } catch (SQLException e) {
+        throw new TajoInternalError(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        resultSet.close();
+      } catch (SQLException e) {
+        LOG.warn(e);
+      }
+
+      if (stats != null) {
+        stats.setNumRows(recordCount);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
new file mode 100644
index 0000000..7489307
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import net.minidev.json.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.UriUtil;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * JDBC Tablespace
+ */
+public abstract class JdbcTablespace extends Tablespace {
+  private static final Log LOG = LogFactory.getLog(JdbcTablespace.class);
+
+  static final StorageProperty STORAGE_PROPERTY = new 
StorageProperty("rowstore", false, true, false, true);
+  static final FormatProperty  FORMAT_PROPERTY = new FormatProperty(false, 
false, false);
+
+  /**
+   * required configuration
+   */
+  public static final String CONFIG_KEY_MAPPED_DATABASE = "mapped_database";
+  /**
+   * optional configuration
+   */
+  public static final String CONFIG_KEY_CONN_PROPERTIES = 
"connection_properties";
+
+  public static final String URI_PARAM_KEY_TABLE = "table";
+
+  protected Connection conn;
+  protected String database;
+  protected Properties connProperties = new Properties();
+
+  public JdbcTablespace(String name, URI uri, JSONObject config) {
+    super(name, uri, config);
+    setDatabase();
+    setJdbcProperties();
+  }
+
+  private void setDatabase() {
+    if (config.containsKey(CONFIG_KEY_MAPPED_DATABASE)) {
+      database = this.config.getAsString(CONFIG_KEY_MAPPED_DATABASE);
+    } else {
+      database = ConnectionInfo.fromURI(uri).database();
+    }
+  }
+
+  private void setJdbcProperties() {
+    Object connPropertiesObjects = config.get(CONFIG_KEY_CONN_PROPERTIES);
+    if (connPropertiesObjects != null) {
+      Preconditions.checkState(connPropertiesObjects instanceof JSONObject, 
"Invalid jdbc_properties field in configs");
+      JSONObject connProperties = (JSONObject) connPropertiesObjects;
+
+      for (Map.Entry<String, Object> entry : connProperties.entrySet()) {
+        this.connProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  @Override
+  protected void storageInit() throws IOException {
+    try {
+      this.conn = DriverManager.getConnection(uri.toASCIIString(), 
connProperties);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public long getTableVolume(URI uri) throws UnsupportedException {
+    throw new UnsupportedException();
+  }
+
+  @Override
+  public URI getTableUri(String databaseName, String tableName) {
+    return URI.create(UriUtil.addParam(getUri().toASCIIString(), 
URI_PARAM_KEY_TABLE, tableName));
+  }
+
+  @Override
+  public List<Fragment> getSplits(String inputSourceId,
+                                  TableDesc tableDesc,
+                                  @Nullable EvalNode filterCondition) throws 
IOException {
+    return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, 
tableDesc.getUri().toASCIIString()));
+  }
+
+  @Override
+  public StorageProperty getProperty() {
+    return STORAGE_PROPERTY;
+  }
+
+  @Override
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    return FORMAT_PROPERTY;
+  }
+
+  @Override
+  public void close() {
+    if (conn != null) {
+      try {
+        conn.close();
+      } catch (SQLException e) {
+        LOG.warn(e);
+      }
+    }
+  }
+
+  @Override
+  public TupleRange[] getInsertSortRanges(OverridableConf queryContext,
+                                          TableDesc tableDesc,
+                                          Schema inputSchema,
+                                          SortSpec[] sortSpecs,
+                                          TupleRange dataRange) throws 
IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws 
IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void prepareTable(LogicalNode node) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public Path commitTable(OverridableConf queryContext, ExecutionBlockId 
finalEbId, LogicalPlan plan, Schema schema,
+                          TableDesc tableDesc) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void rollbackTable(LogicalNode node) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta 
meta) throws IOException {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  public abstract MetadataProvider getMetadataProvider();
+
+  @Override
+  public abstract Scanner getScanner(TableMeta meta,
+                            Schema schema,
+                            Fragment fragment,
+                            @Nullable Schema target) throws IOException;
+
+  public DatabaseMetaData getDatabaseMetaData() {
+    try {
+      return conn.getMetaData();
+    } catch (SQLException e) {
+      throw new TajoInternalError(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
new file mode 100644
index 0000000..f72d6ee
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLBuilder.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.util.StringUtils;
+
+import javax.annotation.Nullable;
+import java.sql.DatabaseMetaData;
+import java.util.Stack;
+
+/**
+ * Generator to build a SQL statement from a plan fragment
+ */
+public class SQLBuilder {
+  @SuppressWarnings("unused")
+  private final DatabaseMetaData dbMetaData;
+  private final SQLExpressionGenerator sqlExprGen;
+
+  public static class SQLBuilderContext {
+    StringBuilder sb;
+  }
+
+  public SQLBuilder(DatabaseMetaData dbMetaData, SQLExpressionGenerator 
exprGen) {
+    this.dbMetaData = dbMetaData;
+    this.sqlExprGen = exprGen;
+  }
+
+  public String build(String tableName, Column [] targets, @Nullable EvalNode 
filter, @Nullable Long limit) {
+
+    StringBuilder selectClause = new StringBuilder("SELECT ");
+    if (targets.length > 0) {
+      selectClause.append(StringUtils.join(targets, ",", new Function<Column, 
String>() {
+        @Override
+        public String apply(@Nullable Column input) {
+          return input.getSimpleName();
+        }
+      }));
+    } else {
+      selectClause.append("1");
+    }
+    selectClause.append(" ");
+
+    StringBuilder fromClause = new StringBuilder("FROM ");
+    fromClause.append(tableName).append(" ");
+
+    StringBuilder whereClause = null;
+    if (filter != null) {
+      whereClause = new StringBuilder("WHERE ");
+      whereClause.append(sqlExprGen.generate(filter)).append(" ");
+    }
+
+    StringBuilder limitClause = null;
+    if (limit != null) {
+      limitClause = new StringBuilder("LIMIT ");
+      limitClause.append(limit).append(" ");
+    }
+
+    return generateSelectStmt(selectClause, fromClause, whereClause, 
limitClause);
+  }
+
+  public String generateSelectStmt(StringBuilder selectClause,
+                                   StringBuilder fromClause,
+                                   @Nullable StringBuilder whereClause,
+                                   @Nullable StringBuilder limitClause) {
+    return
+        selectClause.toString() +
+        fromClause.toString() +
+        (whereClause != null ? whereClause.toString() : "") +
+        (limitClause != null ? limitClause.toString() : "");
+  }
+
+  public String build(LogicalNode planPart) {
+    SQLBuilderContext context = new SQLBuilderContext();
+    visit(context, planPart, new Stack<LogicalNode>());
+    return context.sb.toString();
+  }
+
+  public void visit(SQLBuilderContext context, LogicalNode node, 
Stack<LogicalNode> stack) {
+    stack.push(node);
+
+    switch (node.getType()) {
+    case SCAN:
+      visitScan(context, (ScanNode) node, stack);
+      break;
+
+    case GROUP_BY:
+      visitGroupBy(context, (GroupbyNode) node, stack);
+      break;
+
+    case SELECTION:
+      visitFilter(context, (SelectionNode) node, stack);
+      break;
+
+    case PROJECTION:
+      visitProjection(context, (ProjectionNode) node, stack);
+      break;
+
+    case TABLE_SUBQUERY:
+      visitDerivedSubquery(context, (TableSubQueryNode) node, stack);
+      break;
+
+    default:
+      throw new TajoRuntimeException(new UnsupportedException("plan node '" + 
node.getType().name() + "'"));
+    }
+
+    stack.pop();
+  }
+
+  public void visitDerivedSubquery(SQLBuilderContext ctx, TableSubQueryNode 
derivedSubquery, Stack<LogicalNode> stack) {
+    ctx.sb.append(" (");
+    visit(ctx, derivedSubquery.getSubQuery(), stack);
+    ctx.sb.append(" ) ").append(derivedSubquery.getTableName());
+  }
+
+  public void visitProjection(SQLBuilderContext ctx, ProjectionNode 
projection, Stack<LogicalNode> stack) {
+
+    visit(ctx, projection.getChild(), stack);
+  }
+
+  public void visitGroupBy(SQLBuilderContext ctx, GroupbyNode groupby, 
Stack<LogicalNode> stack) {
+    visit(ctx, groupby.getChild(), stack);
+    ctx.sb.append("GROUP BY 
").append(StringUtils.join(groupby.getGroupingColumns(), ",", 0)).append(" ");
+  }
+
+  public void visitFilter(SQLBuilderContext ctx, SelectionNode filter, 
Stack<LogicalNode> stack) {
+    visit(ctx, filter.getChild(), stack);
+    ctx.sb.append("WHERE " + sqlExprGen.generate(filter.getQual()));
+  }
+
+  public void visitScan(SQLBuilderContext ctx, ScanNode scan, 
Stack<LogicalNode> stack) {
+
+    StringBuilder selectClause = new StringBuilder("SELECT ");
+    if (scan.getTargets().length > 0) {
+      selectClause.append(generateTargetList(scan.getTargets()));
+    } else {
+      selectClause.append("1");
+    }
+
+    selectClause.append(" ");
+
+    ctx.sb.append("FROM ").append(scan.getTableName()).append(" ");
+
+    if (scan.hasAlias()) {
+      ctx.sb.append("AS ").append(scan.getAlias()).append(" ");
+    }
+
+    if (scan.hasQual()) {
+      ctx.sb.append("WHERE " + sqlExprGen.generate(scan.getQual()));
+    }
+  }
+
+  public String generateTargetList(Target [] targets) {
+    return StringUtils.join(targets, ",", new Function<Target, String>() {
+      @Override
+      public String apply(@Nullable Target t) {
+        StringBuilder sb = new 
StringBuilder(sqlExprGen.generate(t.getEvalTree()));
+        if (t.hasAlias()) {
+          sb.append(" AS ").append(t.getAlias());
+        }
+        return sb.toString();
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
new file mode 100644
index 0000000..4d4c781
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/SQLExpressionGenerator.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.common.base.Function;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.BooleanDatum;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedDataTypeException;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.util.StringUtils;
+
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Stack;
+
+/**
+ * A generator to build a SQL representation from a sql expression
+ */
+public class SQLExpressionGenerator extends 
SimpleEvalNodeVisitor<SQLExpressionGenerator.Context> {
+  final private DatabaseMetaData dbMetaData;
+
+  private final String LITERAL_QUOTE = "'";
+  @SuppressWarnings("unused")
+  private final String DEFAULT_LITERAL_QUOTE = "'";
+  @SuppressWarnings("unused")
+  private String IDENTIFIER_QUOTE = "\"";
+  private final String DEFAULT_IDENTIFIER_QUOTE = "\"";
+
+  public SQLExpressionGenerator(DatabaseMetaData dbMetaData) {
+    this.dbMetaData = dbMetaData;
+    initDatabaseDependentSQLRepr();
+  }
+
+  private void initDatabaseDependentSQLRepr() {
+    String quoteStr = null;
+    try {
+      quoteStr = dbMetaData.getIdentifierQuoteString();
+    } catch (SQLException e) {
+    }
+    this.IDENTIFIER_QUOTE = quoteStr != null ? quoteStr : 
DEFAULT_IDENTIFIER_QUOTE;
+  }
+
+  public String quote(String text) {
+    return LITERAL_QUOTE + text + LITERAL_QUOTE;
+  }
+
+  public String generate(EvalNode node) {
+    Context context = new Context();
+    visit(context, node, new Stack<EvalNode>());
+    return context.sb.toString();
+  }
+
+  public static class Context {
+    StringBuilder sb = new StringBuilder();
+
+    public void append(String text) {
+      sb.append(text).append(" ");
+    }
+  }
+
+  @Override
+  protected EvalNode visitUnaryEval(Context context, UnaryEval unary, 
Stack<EvalNode> stack) {
+
+    switch (unary.getType()) {
+    case NOT:
+      context.sb.append("NOT ");
+      super.visitUnaryEval(context, unary, stack);
+      break;
+    case SIGNED:
+      SignedEval signed = (SignedEval) unary;
+      if (signed.isNegative()) {
+        context.sb.append("-");
+      }
+      super.visitUnaryEval(context, unary, stack);
+      break;
+    case IS_NULL:
+      super.visitUnaryEval(context, unary, stack);
+
+      IsNullEval isNull = (IsNullEval) unary;
+      if (isNull.isNot()) {
+        context.sb.append("IS NOT NULL ");
+      } else {
+        context.sb.append("IS NULL ");
+      }
+      break;
+
+    case CAST:
+      super.visitUnaryEval(context, unary, stack);
+      context.sb.append(" AS 
").append(convertTajoTypeToSQLType(unary.getValueType()));
+    }
+    return unary;
+  }
+
+  @Override
+  protected EvalNode visitBinaryEval(Context context, Stack<EvalNode> stack, 
BinaryEval binaryEval) {
+    stack.push(binaryEval);
+    visit(context, binaryEval.getLeftExpr(), stack);
+    
context.sb.append(convertBinOperatorToSQLRepr(binaryEval.getType())).append(" 
");
+    visit(context, binaryEval.getRightExpr(), stack);
+    stack.pop();
+    return binaryEval;
+  }
+
+  @Override
+  protected EvalNode visitConst(Context context, ConstEval constant, 
Stack<EvalNode> stack) {
+    context.sb.append(convertDatumToSQLLiteral(constant.getValue())).append(" 
");
+    return constant;
+  }
+
+  protected EvalNode visitRowConstant(Context context, RowConstantEval row, 
Stack<EvalNode> stack) {
+    StringBuilder sb = new StringBuilder("(");
+    sb.append(StringUtils.join(row.getValues(), ",", new Function<Datum, 
String>() {
+      @Override
+      public String apply(Datum v) {
+        return convertDatumToSQLLiteral(v);
+      }
+    }));
+    sb.append(")");
+    context.append(sb.toString());
+
+    return row;
+  }
+
+  @Override
+  protected EvalNode visitField(Context context, FieldEval field, 
Stack<EvalNode> stack) {
+    // strip the database name
+    String tableName;
+    if (CatalogUtil.isSimpleIdentifier(field.getQualifier())) {
+      tableName = field.getQualifier();
+    } else {
+      tableName = CatalogUtil.extractSimpleName(field.getQualifier());
+    }
+
+    context.append(CatalogUtil.buildFQName(tableName, field.getColumnName()));
+    return field;
+  }
+
+  @Override
+  protected EvalNode visitBetween(Context context, BetweenPredicateEval 
evalNode, Stack<EvalNode> stack) {
+    stack.push(evalNode);
+    visit(context, evalNode.getPredicand(), stack);
+    context.append("BETWEEN");
+    visit(context, evalNode.getBegin(), stack);
+    context.append("AND");
+    visit(context, evalNode.getEnd(), stack);
+    return evalNode;
+  }
+
+  @Override
+  protected EvalNode visitCaseWhen(Context context, CaseWhenEval evalNode, 
Stack<EvalNode> stack) {
+    stack.push(evalNode);
+    context.append("CASE");
+    for (CaseWhenEval.IfThenEval ifThenEval : evalNode.getIfThenEvals()) {
+      visitIfThen(context, ifThenEval, stack);
+    }
+
+    context.append("ELSE");
+    if (evalNode.hasElse()) {
+      visit(context, evalNode.getElse(), stack);
+    }
+    stack.pop();
+    context.append("END");
+    return evalNode;
+  }
+
+  @Override
+  protected EvalNode visitIfThen(Context context, CaseWhenEval.IfThenEval 
evalNode, Stack<EvalNode> stack) {
+    stack.push(evalNode);
+    context.append("WHEN");
+    visit(context, evalNode.getCondition(), stack);
+    context.append("THEN");
+    visit(context, evalNode.getResult(), stack);
+    stack.pop();
+    return evalNode;
+  }
+
+  @Override
+  protected EvalNode visitFuncCall(Context context, FunctionEval func, 
Stack<EvalNode> stack) {
+    // TODO - TAJO-1837 should be resolved if we support RDBMS functions 
better.
+
+    stack.push(func);
+
+    context.sb.append(func.getName()).append("(");
+
+    boolean first = true;
+    for (EvalNode param : func.getArgs()) {
+      if (first) {
+        first = false;
+      } else {
+        context.sb.append(",");
+      }
+
+      visit(context, param, stack);
+    }
+
+    context.sb.append(")");
+    stack.pop();
+
+    return func;
+  }
+
+  @Override
+  protected EvalNode visitSubquery(Context context, SubqueryEval evalNode, 
Stack<EvalNode> stack) {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  /**
+   * convert Tajo literal into SQL representation
+   *
+   * @param d Datum
+   */
+  public String convertDatumToSQLLiteral(Datum d) {
+    switch (d.type()) {
+    case BOOLEAN:
+      return d.asBool() ? "TRUE" : "FALSE";
+
+    case INT1:
+    case INT2:
+    case INT4:
+    case INT8:
+    case FLOAT4:
+    case FLOAT8:
+    case NUMERIC:
+      return d.asChars();
+
+    case TEXT:
+    case VARCHAR:
+    case CHAR:
+      return quote(d.asChars());
+
+    case DATE:
+      return "DATE " + quote(d.asChars());
+
+    case TIME:
+      return "TIME " + quote(d.asChars());
+
+    case TIMESTAMP:
+      return "TIMESTAMP " + quote(d.asChars());
+
+    case NULL_TYPE:
+      return "NULL";
+
+    default:
+      throw new TajoRuntimeException(new 
UnsupportedDataTypeException(d.type().name()));
+    }
+  }
+
+  /**
+   * Convert Tajo DataType into SQL DataType
+   *
+   * @param dataType Tajo DataType
+   * @return SQL DataType
+   */
+  public String convertTajoTypeToSQLType(DataType dataType) {
+    switch (dataType.getType()) {
+    case INT1:
+      return "TINYINT";
+    case INT2:
+      return "SMALLINT";
+    case INT4:
+      return "INTEGER";
+    case INT8:
+      return "BIGINT";
+    case FLOAT4:
+      return "FLOAT";
+    case FLOAT8:
+      return "DOUBLE";
+    default:
+      return dataType.getType().name();
+    }
+  }
+
+  /**
+   * Convert EvalType the operator notation into SQL notation
+   *
+   * @param op EvalType
+   * @return SQL representation
+   */
+  public String convertBinOperatorToSQLRepr(EvalType op) {
+    return op.getOperatorName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto 
b/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto
new file mode 100644
index 0000000..f642e07
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/proto/JdbcFragmentProtos.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.storage.jdbc";
+option java_outer_classname = "JdbcFragmentProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message JdbcFragmentProto {
+  required string uri = 1;
+  required string input_source_id = 2;
+  repeated string hosts = 3;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java
 
b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java
new file mode 100644
index 0000000..a04209f
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-jdbc/src/main/test/java/org/apache/tajo/storage/jdbc/TestConnectionInfo.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestConnectionInfo {
+  @Test
+  public final void testGetConnectionInfoType1() {
+    ConnectionInfo c1 = 
ConnectionInfo.fromURI("jdbc:mysql://localhost:55840?user=testuser&password=testpass");
+    assertEquals("jdbc:mysql", c1.scheme);
+    assertEquals("localhost", c1.host);
+    assertEquals("testuser", c1.user);
+    assertEquals("testpass", c1.password);
+    assertNull(c1.dbName);
+    assertNull(c1.tableName);
+    assertEquals(0, c1.params.size());
+  }
+
+  @Test
+  public final void testGetConnectionInfoType2() {
+    ConnectionInfo c1 = ConnectionInfo.fromURI(
+        
"jdbc:mysql://localhost:55840/db1?table=tb1&user=testuser&password=testpass&TZ=GMT+9");
+    assertEquals("jdbc:mysql", c1.scheme);
+    assertEquals("localhost", c1.host);
+    assertEquals("testuser", c1.user);
+    assertEquals("testpass", c1.password);
+    assertEquals("db1", c1.dbName);
+    assertEquals("tb1", c1.tableName);
+    assertEquals(1, c1.params.size());
+    assertEquals("GMT+9", c1.params.get("TZ"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-pgsql/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-pgsql/pom.xml 
b/tajo-storage/tajo-storage-pgsql/pom.xml
new file mode 100644
index 0000000..c36d9bf
--- /dev/null
+++ b/tajo-storage/tajo-storage-pgsql/pom.xml
@@ -0,0 +1,257 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-pgsql</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo PostgreSQL JDBC storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>derby.log</exclude>
+            <exclude>src/test/resources/dataset/**</exclude>
+            <exclude>src/test/resources/queries/**</exclude>
+            <exclude>src/test/resources/results/**</exclude>
+            <exclude>src/test/resources/pgsql/**</exclude>
+          </excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m 
-Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-jdbc</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-cluster-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>testing-postgresql-server</artifactId>
+      <version>0.3</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>15.0</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java
 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java
new file mode 100644
index 0000000..2fa9f62
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLJdbcScanner.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.pgsql;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.jdbc.JdbcFragment;
+import org.apache.tajo.storage.jdbc.JdbcScanner;
+
+import java.sql.DatabaseMetaData;
+import java.util.Properties;
+
+public class PgSQLJdbcScanner extends JdbcScanner {
+
+  public PgSQLJdbcScanner(DatabaseMetaData dbMetaData,
+                          Properties connProperties,
+                          Schema tableSchema,
+                          TableMeta tableMeta,
+                          JdbcFragment fragment) {
+    super(dbMetaData, connProperties, tableSchema, tableMeta, fragment);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java
 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java
new file mode 100644
index 0000000..069ffe3
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLMetadataProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.pgsql;
+
+import org.apache.tajo.storage.jdbc.JdbcMetadataProviderBase;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+public class PgSQLMetadataProvider extends JdbcMetadataProviderBase {
+
+  public PgSQLMetadataProvider(PgSQLTablespace space, String dbName) {
+    super(space, dbName);
+  }
+
+  @Override
+  protected String getJdbcDriverName() {
+    return "org.postgresql.Driver";
+  }
+
+  @Override
+  public Collection<String> getTables(@Nullable String schemaPattern, 
@Nullable String tablePattern) {
+    return super.getTables("public", tablePattern);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java
 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java
new file mode 100644
index 0000000..1969d13
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-pgsql/src/main/java/org/apache/tajo/storage/pgsql/PgSQLTablespace.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.pgsql;
+
+import net.minidev.json.JSONObject;
+import org.apache.tajo.catalog.MetadataProvider;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.storage.NullScanner;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.jdbc.JdbcFragment;
+import org.apache.tajo.storage.jdbc.JdbcTablespace;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Postgresql Database Tablespace
+ */
+public class PgSQLTablespace extends JdbcTablespace {
+
+  public PgSQLTablespace(String name, URI uri, JSONObject config) {
+    super(name, uri, config);
+  }
+
+  public MetadataProvider getMetadataProvider() {
+    return new PgSQLMetadataProvider(this, database);
+  }
+
+  @Override
+  public Scanner getScanner(TableMeta meta,
+                            Schema schema,
+                            Fragment fragment,
+                            @Nullable Schema target) throws IOException {
+    if (!(fragment instanceof JdbcFragment)) {
+      throw new TajoInternalError("fragment must be JdbcFragment");
+    }
+
+    if (target == null) {
+      target = schema;
+    }
+
+    Scanner scanner;
+    if (fragment.isEmpty()) {
+      scanner = new NullScanner(conf, schema, meta, fragment);
+    } else {
+      scanner = new PgSQLJdbcScanner(getDatabaseMetaData(), connProperties, 
schema, meta, (JdbcFragment) fragment);
+    }
+    scanner.setTarget(target.toArray());
+    return scanner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0e4ad563/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java
 
b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java
new file mode 100644
index 0000000..024b497
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-pgsql/src/test/java/org/apache/tajo/storage/pgsql/PgSQLTestServer.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.pgsql;
+
+import com.google.common.base.Optional;
+import io.airlift.testing.postgresql.TestingPostgreSqlServer;
+import net.minidev.json.JSONObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.jdbc.JdbcTablespace;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.JavaResourceUtil;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+public class PgSQLTestServer {
+  private static final Log LOG = LogFactory.getLog(PgSQLTestServer.class);
+
+  private static PgSQLTestServer instance;
+
+  public static final String [] TPCH_TABLES = {
+      "customer", "lineitem", "nation", "orders", "part", "partsupp", 
"region", "supplier"
+  };
+
+  public static final String SPACENAME = "pgsql_cluster";
+  public static final String DATABASE_NAME = "tpch";
+  public static final String USERNAME = "testuser";
+  public static final String PASSWORD = "";
+
+  private final TestingPostgreSqlServer server;
+
+  static {
+    try {
+      instance = new PgSQLTestServer();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static PgSQLTestServer getInstance() {
+    return instance;
+  }
+
+  private PgSQLTestServer() throws Exception {
+    server = new TestingPostgreSqlServer(USERNAME,
+        "tpch"
+    );
+
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          server.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }));
+
+    loadTPCHTables();
+    registerTablespace();
+  }
+
+  Path testPath = CommonTestingUtil.getTestDir();
+
+  private void loadTPCHTables() throws SQLException, IOException {
+
+
+    try (Connection connection = 
DriverManager.getConnection(getJdbcUrlForAdmin(), "postgres", null)) {
+      connection.setCatalog("tpch");
+
+      try (Statement statement = connection.createStatement()) {
+
+        for (String tableName : TPCH_TABLES) {
+          String sql = JavaResourceUtil.readTextFromResource("pgsql/" + 
tableName + ".sql");
+          statement.executeUpdate(sql);
+
+          // restore the table contents into a file stored in a local file 
system for PgSQL COPY command
+          String path = restoreTableContents(tableName);
+          String copyCommand = genLoadStatement(tableName, path);
+          statement.executeUpdate(copyCommand);
+        }
+
+        // load DATETIME_TYPES table
+        String sql = 
JavaResourceUtil.readTextFromResource("pgsql/datetime_types.sql");
+        statement.executeUpdate(sql);
+        Path filePath = new Path(testPath, "datetime_types.txt");
+        storeTableContents("pgsql/datetime_types.txt", filePath);
+        String copyCommand = genLoadStatement("datetime_types", 
filePath.toUri().getPath());
+        LOG.info(copyCommand);
+        statement.executeUpdate(copyCommand);
+
+      } catch (Throwable t) {
+        t.printStackTrace();
+        throw t;
+      }
+    }
+  }
+
+  private String genLoadStatement(String tableName, String path) {
+    return  "COPY " + tableName + " FROM '" + path + "' WITH (FORMAT csv, 
DELIMITER '|');";
+  }
+
+  private void storeTableContents(String resource, Path path) throws 
IOException {
+    String csvTable = JavaResourceUtil.readTextFromResource(resource);
+    String fixedCsvTable = fixExtraColumn(csvTable);
+    FileUtil.writeTextToFile(fixedCsvTable, path);
+  }
+
+  private String restoreTableContents(String tableName) throws IOException {
+    Path filePath = new Path(testPath, tableName + ".tbl");
+    storeTableContents("tpch/" + tableName + ".tbl", filePath);
+    return filePath.toUri().getPath();
+  }
+
+  private String fixExtraColumn(String csvTable) {
+    final String [] lines = csvTable.split("\n");
+    final StringBuilder rewritten = new StringBuilder();
+
+    for (String l : lines) {
+      if (l.charAt(l.length() - 1) == '|') {
+        rewritten.append(l.substring(0, l.length() - 1));
+      } else {
+        rewritten.append(l.substring(0, l.length()));
+      }
+      rewritten.append("\n");
+    }
+
+    return rewritten.toString();
+  }
+
+  private void registerTablespace() throws IOException {
+    JSONObject configElements = new JSONObject();
+    configElements.put(JdbcTablespace.CONFIG_KEY_MAPPED_DATABASE, 
DATABASE_NAME);
+
+    PgSQLTablespace tablespace = new PgSQLTablespace(SPACENAME, 
URI.create(getJdbcUrlForAdmin()), configElements);
+    tablespace.init(new TajoConf());
+
+    TablespaceManager.addTableSpaceForTest(tablespace);
+  }
+
+  /**
+   * get JDBC URL for a created user
+   *
+   * @return JDBC URL for the created user
+   */
+  public String getJdbcUrl() {
+    return server.getJdbcUrl() + "&connectTimeout=5&socketTimeout=5";
+  }
+
+  /**
+   * get JDBC URL for the Admin user
+   *
+   * @return JDBC URL for the Admin user
+   */
+  public String getJdbcUrlForAdmin() {
+    // replace 'user' by postgres (admin)
+    String url = server.getJdbcUrl().split("\\?")[0];
+    url += "?user=postgres&connectTimeout=5&socketTimeout=5";
+    return url;
+  }
+
+  public TestingPostgreSqlServer getServer() {
+    return server;
+  }
+
+  public static Optional<Tablespace> 
resetAllParamsAndSetConnProperties(Map<String, String> connProperties)
+      throws IOException {
+    String uri = PgSQLTestServer.getInstance().getJdbcUrl().split("\\?")[0];
+
+    JSONObject configElements = new JSONObject();
+    configElements.put(JdbcTablespace.CONFIG_KEY_MAPPED_DATABASE, 
PgSQLTestServer.DATABASE_NAME);
+
+    JSONObject connPropertiesJson = new JSONObject();
+    for (Map.Entry<String, String> entry : connProperties.entrySet()) {
+      connPropertiesJson.put(entry.getKey(), entry.getValue());
+    }
+    configElements.put(JdbcTablespace.CONFIG_KEY_CONN_PROPERTIES, 
connPropertiesJson);
+
+    PgSQLTablespace tablespace = new 
PgSQLTablespace(PgSQLTestServer.SPACENAME, URI.create(uri), configElements);
+    tablespace.init(new TajoConf());
+    return TablespaceManager.addTableSpaceForTest(tablespace);
+  }
+}

Reply via email to