TAJO-1109: Separate SQL Statements from Catalog Stores. (Jihun Kang via hyunsik)

Closes #201


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/00555685
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/00555685
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/00555685

Branch: refs/heads/hbase_storage
Commit: 005556850e60f14d5e11ca74035b195461f31d29
Parents: 2b8d30c
Author: Hyunsik Choi <[email protected]>
Authored: Tue Nov 11 14:16:19 2014 -0800
Committer: Hyunsik Choi <[email protected]>
Committed: Tue Nov 11 14:16:19 2014 -0800

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-catalog/tajo-catalog-server/pom.xml        |   6 +
 .../tajo/catalog/store/AbstractDBStore.java     |  96 ++-
 .../apache/tajo/catalog/store/DerbyStore.java   | 330 +--------
 .../apache/tajo/catalog/store/OracleStore.java  | 337 +--------
 .../tajo/catalog/store/PostgreSQLStore.java     | 293 +-------
 .../catalog/store/XMLCatalogSchemaManager.java  | 698 +++++++++++++++++++
 .../tajo/catalog/store/object/BaseSchema.java   |  76 ++
 .../catalog/store/object/DatabaseObject.java    |  80 +++
 .../store/object/DatabaseObjectType.java        |  48 ++
 .../tajo/catalog/store/object/SQLObject.java    |  52 ++
 .../tajo/catalog/store/object/SchemaPatch.java  |  78 +++
 .../tajo/catalog/store/object/StoreObject.java  |  87 +++
 .../resources/schemas/DBMSSchemaDefinition.xsd  | 177 +++++
 .../main/resources/schemas/derby/columns.sql    |   8 -
 .../main/resources/schemas/derby/databases.sql  |   6 -
 .../resources/schemas/derby/databases_idx.sql   |   1 -
 .../src/main/resources/schemas/derby/derby.xml  | 186 +++++
 .../main/resources/schemas/derby/indexes.sql    |  12 -
 .../schemas/derby/partition_methods.sql         |   6 -
 .../main/resources/schemas/derby/partitions.sql |  10 -
 .../src/main/resources/schemas/derby/stats.sql  |   6 -
 .../schemas/derby/table_properties.sql          |   6 -
 .../src/main/resources/schemas/derby/tables.sql |  10 -
 .../resources/schemas/derby/tablespaces.sql     |   7 -
 .../main/resources/schemas/oracle/oracle.xml    | 218 ++++++
 .../resources/schemas/postgresql/indexes.sql    |  14 -
 .../resources/schemas/postgresql/postgresql.xml | 203 ++++++
 .../main/resources/schemas/postgresql/stats.sql |   6 -
 .../store/TestXMLCatalogSchemaManager.java      | 496 +++++++++++++
 .../schemas/derbytest/loadtest/derby.xml        | 191 +++++
 .../derbytest/mergetest/base_version_1.xml      |  35 +
 .../derbytest/mergetest/base_version_2.xml      |  63 ++
 .../schemas/derbytest/querytest/derby.xml       |  78 +++
 .../derbytest/upgradetest/base_version_2.xml    |  57 ++
 35 files changed, 2934 insertions(+), 1046 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d71905b..1337f87 100644
--- a/CHANGES
+++ b/CHANGES
@@ -13,6 +13,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
    
+    TAJO-1109: Separate SQL Statements from Catalog Stores.
+    (Jihun Kang via hyunsik)
+
     TAJO-1161: Remove joda time dependency from tajo-core. 
     (Jihun Kang via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/pom.xml 
b/tajo-catalog/tajo-catalog-server/pom.xml
index f811150..a2801e6 100644
--- a/tajo-catalog/tajo-catalog-server/pom.xml
+++ b/tajo-catalog/tajo-catalog-server/pom.xml
@@ -171,6 +171,12 @@
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
     </dependency>
+    <dependency>
+       <groupId>org.hamcrest</groupId>
+       <artifactId>hamcrest-library</artifactId>
+       <version>1.3</version>
+       <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index bcf6774..7c1baab 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -22,6 +22,7 @@
 package org.apache.tajo.catalog.store;
 
 import com.google.protobuf.InvalidProtocolBufferException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.Pair;
 
@@ -58,21 +58,40 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
   protected final String catalogUri;
 
   private Connection conn;
-
+  
   protected Map<String, Boolean> baseTableMaps = new HashMap<String, 
Boolean>();
+  
+  protected XMLCatalogSchemaManager catalogSchemaManager;
 
   protected abstract String getCatalogDriverName();
+  
+  protected String getCatalogSchemaPath() {
+    return "";
+  }
 
   protected abstract Connection createConnection(final Configuration conf) 
throws SQLException;
+  
+  protected void createDatabaseDependants() throws CatalogException {
+    
+  }
+  
+  protected boolean isInitialized() throws CatalogException {
+    return catalogSchemaManager.isInitialized(getConnection());
+  }
 
-  protected abstract boolean isInitialized() throws CatalogException;
-
-  protected abstract void createBaseTable() throws CatalogException;
+  protected void createBaseTable() throws CatalogException {
+    createDatabaseDependants();
+    
+    catalogSchemaManager.createBaseSchema(getConnection());
+    
+    insertSchemaVersion();
+  }
 
-  protected abstract void dropBaseTable() throws CatalogException;
+  protected void dropBaseTable() throws CatalogException {
+    catalogSchemaManager.dropBaseSchema(getConnection());
+  }
 
   public AbstractDBStore(Configuration conf) throws InternalException {
-
     this.conf = conf;
 
     if (conf.get(CatalogConstants.DEPRECATED_CATALOG_URI) != null) {
@@ -116,6 +135,11 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
           + ")", e);
     }
 
+    String schemaPath = getCatalogSchemaPath();
+    if (schemaPath != null && !schemaPath.isEmpty()) {
+      this.catalogSchemaManager = new XMLCatalogSchemaManager(schemaPath);
+    }
+    
     try {
       if (isInitialized()) {
         LOG.info("The base tables of CatalogServer already is initialized.");
@@ -138,7 +162,9 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
     }
   }
 
-  public abstract int getDriverVersion();
+  public int getDriverVersion() {
+    return catalogSchemaManager.getCatalogStore().getSchema().getVersion();
+  }
 
   public String readSchemaFile(String path) throws CatalogException {
     try {
@@ -176,42 +202,52 @@ public abstract class AbstractDBStore extends 
CatalogConstants implements Catalo
     return conn;
   }
 
-  private void verifySchemaVersion() throws CatalogException {
+  private int getSchemaVersion() {
     Connection conn = null;
     PreparedStatement pstmt = null;
     ResultSet result = null;
+    int schemaVersion = -1;
+    
+    String sql = "SELECT version FROM META";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql.toString());
+    }
 
     try {
-      String sql = "SELECT version FROM META";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql.toString());
-      }
-
       conn = getConnection();
       pstmt = conn.prepareStatement(sql);
       result = pstmt.executeQuery();
 
-      boolean noVersion = !result.next();
-
-      int schemaVersion = result.getInt(1);
-      if (noVersion || schemaVersion != getDriverVersion()) {
-        LOG.error(String.format("Catalog version (%d) and current driver 
version (%d) are mismatch to each other",
-            schemaVersion, getDriverVersion()));
-        
LOG.error("=========================================================================");
-        LOG.error("|                Catalog Store Migration Is Needed          
            |");
-        
LOG.error("=========================================================================");
-        LOG.error("| You might downgrade or upgrade Apache Tajo. Downgrading 
or upgrading  |");
-        LOG.error("| Tajo without migration process is only available in some 
versions.    |");
-        LOG.error("| In order to learn how to migration Apache Tajo instance,  
            |");
-        LOG.error("| please refer http://s.apache.org/0_8_migration.           
            |");
-        
LOG.error("=========================================================================");
-        throw new CatalogException("Migration Needed. Please refer 
http://s.apache.org/0_8_migration.";);
+      if (result.next()) {
+        schemaVersion = result.getInt("VERSION");
       }
     } catch (SQLException e) {
-      throw new CatalogException(e);
+      throw new CatalogException(e.getMessage(), e);
     } finally {
       CatalogUtil.closeQuietly(pstmt, result);
     }
+    
+    return schemaVersion;
+  }
+
+  private void verifySchemaVersion() throws CatalogException {
+    int schemaVersion = -1;
+
+    schemaVersion = getSchemaVersion();
+
+    if (schemaVersion == -1 || schemaVersion != getDriverVersion()) {
+      LOG.error(String.format("Catalog version (%d) and current driver version 
(%d) are mismatch to each other",
+          schemaVersion, getDriverVersion()));
+      
LOG.error("=========================================================================");
+      LOG.error("| Catalog Store Migration Is Needed |");
+      
LOG.error("=========================================================================");
+      LOG.error("| You might downgrade or upgrade Apache Tajo. Downgrading or 
upgrading |");
+      LOG.error("| Tajo without migration process is only available in some 
versions. |");
+      LOG.error("| In order to learn how to migration Apache Tajo instance, 
|");
+      LOG.error("| please refer http://s.apache.org/0_8_migration. |");
+      
LOG.error("=========================================================================");
+      throw new CatalogException("Migration Needed. Please refer 
http://s.apache.org/0_8_migration.";);
+    }
 
     LOG.info(String.format("The compatibility of the catalog schema (version: 
%d) has been verified.",
         getDriverVersion()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index d6f9fc3..4a1053a 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -27,15 +27,9 @@ import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.exception.InternalException;
 
 import java.sql.*;
-import java.util.HashMap;
-import java.util.Map;
 
 public class DerbyStore extends AbstractDBStore {
 
-  /** 2014-03-20: First versioning */
-  private static final int DERBY_STORE_VERSION_2 = 2;
-  /** Before 2013-03-20 */
-  private static final int DERBY_STORE_VERSION_1 = 1;
   private static final String 
CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
 
   protected String getCatalogDriverName(){
@@ -46,10 +40,6 @@ public class DerbyStore extends AbstractDBStore {
     super(conf);
   }
 
-  public int getDriverVersion() {
-    return DERBY_STORE_VERSION_2;
-  }
-
   protected Connection createConnection(Configuration conf) throws 
SQLException {
     return DriverManager.getConnection(getCatalogUri());
   }
@@ -59,306 +49,6 @@ public class DerbyStore extends AbstractDBStore {
     return super.readSchemaFile("derby/" + filename);
   }
 
-  // TODO - DDL and index statements should be renamed
-  protected void createBaseTable() throws CatalogException {
-    Connection conn = null;
-    Statement stmt = null;
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-      //META
-      if (!baseTableMaps.get(TB_META)) {
-
-        String sql = super.readSchemaFile("common/meta.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-
-        LOG.info("Table '" + TB_META + "' is created.");
-        baseTableMaps.put(TB_META, true);
-      }
-
-      // TABLE SPACES
-      if (!baseTableMaps.get(TB_SPACES)) {
-
-        String sql = readSchemaFile("tablespaces.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-
-        LOG.info("Table '" + TB_SPACES + "' is created.");
-        baseTableMaps.put(TB_SPACES, true);
-      }
-
-      // DATABASES
-      if (!baseTableMaps.get(TB_DATABASES)) {
-        String sql = readSchemaFile("databases.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        sql = readSchemaFile("databases_idx.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_DATABASES + "' is created.");
-        baseTableMaps.put(TB_DATABASES, true);
-      }
-
-      // TABLES
-      if (!baseTableMaps.get(TB_TABLES)) {
-
-        String sql = readSchemaFile("tables.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-        sql = "CREATE UNIQUE INDEX idx_tables_tid on TABLES (TID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-
-        sql = "CREATE UNIQUE INDEX idx_tables_name on TABLES (DB_ID, 
TABLE_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_TABLES + "' is created.");
-        baseTableMaps.put(TB_TABLES, true);
-      }
-
-      // COLUMNS
-      if (!baseTableMaps.get(TB_COLUMNS)) {
-        String sql = readSchemaFile("columns.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        sql = "CREATE UNIQUE INDEX idx_fk_columns_table_name on " + TB_COLUMNS 
+ "(TID, COLUMN_NAME)";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_COLUMNS + " is created.");
-        baseTableMaps.put(TB_COLUMNS, true);
-      }
-
-      // OPTIONS
-      if (!baseTableMaps.get(TB_OPTIONS)) {
-        String sql = readSchemaFile("table_properties.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-        sql = "CREATE INDEX idx_options_key on " + TB_OPTIONS + "(TID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        stmt.executeBatch();
-
-        LOG.info("Table '" + TB_OPTIONS + " is created.");
-        baseTableMaps.put(TB_OPTIONS, true);
-      }
-
-      // INDEXES
-      if (!baseTableMaps.get(TB_INDEXES)) {
-        String sql = readSchemaFile("indexes.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        sql = "CREATE UNIQUE INDEX idx_indexes_pk ON " + TB_INDEXES + "(" + 
COL_DATABASES_PK + ",index_name)";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        sql = "CREATE INDEX idx_indexes_columns ON " + TB_INDEXES + "(" + 
COL_DATABASES_PK + ",column_name)";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_INDEXES + "' is created.");
-        baseTableMaps.put(TB_INDEXES, true);
-      }
-
-      if (!baseTableMaps.get(TB_STATISTICS)) {
-        String sql = readSchemaFile("stats.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-        sql = "CREATE UNIQUE INDEX idx_stats_table_name ON " + TB_STATISTICS + 
"(TID)";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_STATISTICS + "' is created.");
-        baseTableMaps.put(TB_STATISTICS, true);
-      }
-
-      // PARTITION_METHODS
-      if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
-        String sql = readSchemaFile("partition_methods.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-        sql = "CREATE INDEX idx_partition_methods_table_id ON " + 
TB_PARTITION_METHODS + "(TID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
-        baseTableMaps.put(TB_PARTITION_METHODS, true);
-      }
-
-      // PARTITIONS
-      if (!baseTableMaps.get(TB_PARTTIONS)) {
-        String sql = readSchemaFile("partitions.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-
-        sql = "CREATE INDEX idx_partitions_table_name ON PARTITIONS(TID)";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
-        baseTableMaps.put(TB_PARTTIONS, true);
-      }
-
-      insertSchemaVersion();
-
-    } catch (SQLException se) {
-      throw new CatalogException("failed to create base tables for Derby 
catalog store.", se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-    }
-  }
-
-  @Override
-  protected void dropBaseTable() throws CatalogException {
-    Connection conn;
-    Statement stmt = null;
-    Map<String, Boolean> droppedTable = new HashMap<String, Boolean>();
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-      for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-        if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) {
-          String sql = "DROP TABLE " + entry.getKey();
-          stmt.addBatch(sql);
-          droppedTable.put(entry.getKey(), true);
-        }
-      }
-      if(baseTableMaps.get(TB_TABLES)) {
-        String sql = "DROP TABLE " + TB_TABLES;
-        stmt.addBatch(sql);
-        droppedTable.put(TB_TABLES, true);
-      }
-      stmt.executeBatch();
-
-      for(String tableName : droppedTable.keySet()) {
-        LOG.info("Table '" + tableName + "' is dropped");
-      }
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-    }
-  }
-
-  @Override
-  protected boolean isInitialized() throws CatalogException {
-    Connection conn;
-    ResultSet res = null;
-
-    try {
-      conn = getConnection();
-      res = conn.getMetaData().getTables(null, null, null,
-          new String [] {"TABLE"});
-
-      baseTableMaps.put(TB_META, false);
-      baseTableMaps.put(TB_SPACES, false);
-      baseTableMaps.put(TB_DATABASES, false);
-      baseTableMaps.put(TB_TABLES, false);
-      baseTableMaps.put(TB_COLUMNS, false);
-      baseTableMaps.put(TB_OPTIONS, false);
-      baseTableMaps.put(TB_STATISTICS, false);
-      baseTableMaps.put(TB_INDEXES, false);
-      baseTableMaps.put(TB_PARTITION_METHODS, false);
-      baseTableMaps.put(TB_PARTTIONS, false);
-
-      while (res.next()) {
-        baseTableMaps.put(res.getString("TABLE_NAME"), true);
-      }
-    } catch (SQLException se){
-      throw  new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(res);
-    }
-
-    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-      if (!entry.getValue()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-
   @Override
   public final void close() {
     Connection conn = null;
@@ -379,4 +69,24 @@ public class DerbyStore extends AbstractDBStore {
     }
     LOG.info("Shutdown database (" + catalogUri + ")");
   }
+
+  @Override
+  protected void createDatabaseDependants() throws CatalogException {
+    String schemaName = 
catalogSchemaManager.getCatalogStore().getSchema().getSchemaName();
+    Statement stmt = null;
+    
+    if (schemaName != null && !schemaName.isEmpty()) {
+      try {
+        stmt = getConnection().createStatement();
+        stmt.executeUpdate("CREATE SCHEMA " + schemaName);
+      } catch (SQLException e) {
+        throw new CatalogException(e);
+      }
+    }
+  }
+
+  @Override
+  protected String getCatalogSchemaPath() {
+    return "schemas/derby";
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java
index cea777e..45c153c 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java
@@ -15,25 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.tajo.catalog.store;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.exception.InternalException;
 
 public class OracleStore extends AbstractDBStore {
   
-  private static final int ORACLE_STORE_VERSION = 2;
   private static final String CATALOG_DRIVER = "oracle.jdbc.OracleDriver";
 
   public OracleStore(Configuration conf) throws InternalException {
@@ -46,335 +40,18 @@ public class OracleStore extends AbstractDBStore {
   }
 
   @Override
-  protected Connection createConnection(Configuration conf) throws 
SQLException {
-    return DriverManager.getConnection(getCatalogUri(), this.connectionId, 
this.connectionPassword);
-  }
-
-  @Override
-  protected boolean isInitialized() throws CatalogException {
-    Connection conn;
-    ResultSet res = null;
-
-    try {
-      conn = getConnection();
-      res = conn.getMetaData().getTables(null, 
-          this.connectionId != null?this.connectionId.toUpperCase():null, 
-              null, new String[] { "TABLE" });
-      
-      baseTableMaps.put(TB_META, false);
-      baseTableMaps.put(TB_SPACES, false);
-      baseTableMaps.put(TB_DATABASES, false);
-      baseTableMaps.put(TB_TABLES, false);
-      baseTableMaps.put(TB_COLUMNS, false);
-      baseTableMaps.put(TB_OPTIONS, false);
-      baseTableMaps.put(TB_STATISTICS, false);
-      baseTableMaps.put(TB_INDEXES, false);
-      baseTableMaps.put(TB_PARTITION_METHODS, false);
-      baseTableMaps.put(TB_PARTTIONS, false);
-      
-      while (res.next()) {
-        baseTableMaps.put(res.getString("TABLE_NAME"), true);
-      }
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(res);
-    }
-    
-    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-      if (!entry.getValue()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  protected void createBaseTable() throws CatalogException {
-    Statement stmt = null;
-    Connection conn = null;
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-
-      // META
-      if (!baseTableMaps.get(TB_META)) {
-        String sql = super.readSchemaFile("common/meta.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_META + " is created.");
-        baseTableMaps.put(TB_META, true);
-      }
-
-      // TABLE SPACES
-      if (!baseTableMaps.get(TB_SPACES)) {
-        String sql = readSchemaFile("tablespaces.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE SEQUENCE TABLESPACES_SEQ";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE OR REPLACE TRIGGER TABLESPACES_AUTOINC " +
-                "BEFORE INSERT ON TABLESPACES " +
-                "FOR EACH ROW " +
-                "WHEN (new.SPACE_ID IS NULL) " +
-                "BEGIN " +
-                "  SELECT TABLESPACES_SEQ.NEXTVAL INTO :new.SPACE_ID FROM 
DUAL; " +
-                "END;";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_SPACES + "' is created.");
-        baseTableMaps.put(TB_SPACES, true);
-      }
-
-      // DATABASES
-      if (!baseTableMaps.get(TB_DATABASES)) {
-        String sql = readSchemaFile("databases.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }        
-        stmt.addBatch(sql);
-        
-        sql = "CREATE SEQUENCE DATABASES__SEQ";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE OR REPLACE TRIGGER DATABASES__AUTOINC " +
-            "BEFORE INSERT ON DATABASES_ " +
-            "FOR EACH ROW " +
-            "WHEN (new.DB_ID IS NULL) " +
-            "BEGIN " +
-            "  SELECT DATABASES__SEQ.NEXTVAL INTO :new.DB_ID FROM DUAL; " +
-            "END;";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_DATABASES + "' is created.");
-        baseTableMaps.put(TB_DATABASES, true);
-      }
-
-      // TABLES
-      if (!baseTableMaps.get(TB_TABLES)) {
-        String sql = readSchemaFile("tables.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE SEQUENCE TABLES_SEQ";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE OR REPLACE TRIGGER TABLES_AUTOINC " +
-            "BEFORE INSERT ON TABLES " +
-            "FOR EACH ROW " +
-            "WHEN (new.TID IS NULL) " +
-            "BEGIN " +
-            "  SELECT TABLES_SEQ.NEXTVAL INTO :new.TID FROM DUAL; " +
-            "END;";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX TABLES_IDX_DB_ID on TABLES (DB_ID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE UNIQUE INDEX TABLES_IDX_TABLE_ID on TABLES (DB_ID, 
TABLE_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_TABLES + "' is created.");
-        baseTableMaps.put(TB_TABLES, true);
-      }
-
-      // COLUMNS
-      if (!baseTableMaps.get(TB_COLUMNS)) {
-        String sql = readSchemaFile("columns.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_COLUMNS + " is created.");
-        baseTableMaps.put(TB_COLUMNS, true);
-      }
-
-      // OPTIONS
-      if (!baseTableMaps.get(TB_OPTIONS)) {
-        String sql = readSchemaFile("table_properties.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_OPTIONS + " is created.");
-        baseTableMaps.put(TB_OPTIONS, true);
-      }
-
-      // INDEXES
-      if (!baseTableMaps.get(TB_INDEXES)) {
-        String sql = readSchemaFile("indexes.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, 
COLUMN_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_INDEXES + "' is created.");
-        baseTableMaps.put(TB_INDEXES, true);
-      }
-
-      if (!baseTableMaps.get(TB_STATISTICS)) {
-        String sql = readSchemaFile("stats.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_STATISTICS + "' is created.");
-        baseTableMaps.put(TB_STATISTICS, true);
-      }
-
-      // PARTITION_METHODS
-      if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
-        String sql = readSchemaFile("partition_methods.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
-        baseTableMaps.put(TB_PARTITION_METHODS, true);
-      }
-
-      // PARTITIONS
-      if (!baseTableMaps.get(TB_PARTTIONS)) {
-        String sql = readSchemaFile("partitions.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX PARTITIONS_IDX_TID on PARTITIONS (TID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
-        baseTableMaps.put(TB_PARTTIONS, true);
-      }
-
-      insertSchemaVersion();
-
-    } catch (SQLException se) {
-      throw new CatalogException("failed to create base tables for Oracle 
catalog store", se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-    }
+  protected String getCatalogSchemaPath() {
+    return "schemas/oracle";
   }
 
   @Override
-  protected void dropBaseTable() throws CatalogException {
-    Connection conn;
-    Statement stmt = null;
-    PreparedStatement dropSequence = null;
-    Map<String, Boolean> droppedTable = new HashMap<String, Boolean>();
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-      for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-        if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) {
-          String sql = "DROP TABLE " + entry.getKey();
-          stmt.addBatch(sql);
-          droppedTable.put(entry.getKey(), true);
-        }
-      }
-      if(baseTableMaps.get(TB_TABLES)) {
-        String sql = "DROP TABLE " + TB_TABLES;
-        stmt.addBatch(sql);
-        droppedTable.put(TB_TABLES, true);
-      }
-      stmt.executeBatch();
-      
-      for(String tableName : droppedTable.keySet()) {
-        LOG.info("Table '" + tableName + "' is dropped");
-      }
-      
-      // Drop sequences
-      dropSequence = conn.prepareStatement("SELECT COUNT(*) FROM 
USER_SEQUENCES WHERE SEQUENCE_NAME = ?");
-      for(String tableName : droppedTable.keySet()) {
-        String possibleSeqName = tableName + "_SEQ";
-        dropSequence.setString(1, possibleSeqName);
-        ResultSet rs = dropSequence.executeQuery();
-        
-        if (rs.next() && rs.getInt(1) > 0) {
-          String sql = "DROP SEQUENCE " + possibleSeqName;
-          stmt.execute(sql);
-          LOG.info("Sequence '" + possibleSeqName + "' is dropped");
-        }
-      }
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-      CatalogUtil.closeQuietly(dropSequence);
-    }
+  protected Connection createConnection(Configuration conf) throws 
SQLException {
+    return DriverManager.getConnection(getCatalogUri(), this.connectionId, 
this.connectionPassword);
   }
 
   @Override
-  public int getDriverVersion() {
-    return ORACLE_STORE_VERSION;
+  protected void createDatabaseDependants() throws CatalogException {
+    
   }
 
-  @Override
-  public String readSchemaFile(String path) throws CatalogException {
-    return super.readSchemaFile("oracle/" + path);
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java
index a06cf19..41f2909 100644
--- 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java
@@ -15,26 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.tajo.catalog.store;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.exception.InternalException;
 
 public class PostgreSQLStore extends AbstractDBStore {
   
-  private static final int POSTGRESQL_STORE_VERSION = 2;
   private static final String CATALOG_DRIVER = "org.postgresql.Driver";
-
+  
   public PostgreSQLStore(Configuration conf) throws InternalException {
     super(conf);
   }
@@ -43,290 +38,20 @@ public class PostgreSQLStore extends AbstractDBStore {
   protected String getCatalogDriverName() {
     return CATALOG_DRIVER;
   }
+  
+  @Override
+  protected String getCatalogSchemaPath() {
+    return "schemas/postgresql";
+  }
 
   @Override
   protected Connection createConnection(Configuration conf) throws 
SQLException {
     return DriverManager.getConnection(getCatalogUri(), this.connectionId, 
this.connectionPassword);
   }
-
+  
   @Override
-  protected boolean isInitialized() throws CatalogException {
-    Connection conn;
-    ResultSet res = null;
-
-    try {
-      conn = getConnection();
-      res = conn.getMetaData().getTables(null, null, null, new String[] { 
"TABLE" });
-      
-      baseTableMaps.put(TB_META, false);
-      baseTableMaps.put(TB_SPACES, false);
-      baseTableMaps.put(TB_DATABASES, false);
-      baseTableMaps.put(TB_TABLES, false);
-      baseTableMaps.put(TB_COLUMNS, false);
-      baseTableMaps.put(TB_OPTIONS, false);
-      baseTableMaps.put(TB_STATISTICS, false);
-      baseTableMaps.put(TB_INDEXES, false);
-      baseTableMaps.put(TB_PARTITION_METHODS, false);
-      baseTableMaps.put(TB_PARTTIONS, false);
-      
-      while (res.next()) {
-        baseTableMaps.put(res.getString("TABLE_NAME").toUpperCase(), true);
-      }
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(res);
-    }
+  protected void createDatabaseDependants() throws CatalogException {
     
-    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-      if (!entry.getValue()) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  protected void createBaseTable() throws CatalogException {
-    Statement stmt = null;
-    Connection conn = null;
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-
-      // META
-      if (!baseTableMaps.get(TB_META)) {
-        String sql = super.readSchemaFile("common/meta.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql.toString());
-        LOG.info("Table '" + TB_META + " is created.");
-        baseTableMaps.put(TB_META, true);
-      }
-
-      // TABLE SPACES
-      if (!baseTableMaps.get(TB_SPACES)) {
-        String sql = readSchemaFile("tablespaces.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE UNIQUE INDEX TABLESPACES_IDX_NAME on TABLESPACES 
(SPACE_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_SPACES + "' is created.");
-        baseTableMaps.put(TB_SPACES, true);
-      }
-
-      // DATABASES
-      if (!baseTableMaps.get(TB_DATABASES)) {
-        String sql = readSchemaFile("databases.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }        
-        stmt.addBatch(sql);
-        
-        sql = "CREATE UNIQUE INDEX DATABASES__IDX_NAME on DATABASES_ 
(DB_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_DATABASES + "' is created.");
-        baseTableMaps.put(TB_DATABASES, true);
-      }
-
-      // TABLES
-      if (!baseTableMaps.get(TB_TABLES)) {
-        String sql = readSchemaFile("tables.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX TABLES_IDX_DB_ID on TABLES (DB_ID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE UNIQUE INDEX TABLES_IDX_TABLE_ID on TABLES (DB_ID, 
TABLE_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_TABLES + "' is created.");
-        baseTableMaps.put(TB_TABLES, true);
-      }
-
-      // COLUMNS
-      if (!baseTableMaps.get(TB_COLUMNS)) {
-        String sql = readSchemaFile("columns.sql");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_COLUMNS + " is created.");
-        baseTableMaps.put(TB_COLUMNS, true);
-      }
-
-      // OPTIONS
-      if (!baseTableMaps.get(TB_OPTIONS)) {
-        String sql = readSchemaFile("table_properties.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_OPTIONS + " is created.");
-        baseTableMaps.put(TB_OPTIONS, true);
-      }
-
-      // INDEXES
-      if (!baseTableMaps.get(TB_INDEXES)) {
-        String sql = readSchemaFile("indexes.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql.toString());
-        
-        sql = "CREATE UNIQUE INDEX INDEXES_IDX_DB_ID_NAME on INDEXES (DB_ID, 
INDEX_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, 
COLUMN_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_INDEXES + "' is created.");
-        baseTableMaps.put(TB_INDEXES, true);
-      }
-
-      if (!baseTableMaps.get(TB_STATISTICS)) {
-        String sql = readSchemaFile("stats.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
-        }
-
-        stmt.executeUpdate(sql.toString());
-        LOG.info("Table '" + TB_STATISTICS + "' is created.");
-        baseTableMaps.put(TB_STATISTICS, true);
-      }
-
-      // PARTITION_METHODS
-      if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
-        String sql = readSchemaFile("partition_methods.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        stmt.executeUpdate(sql);
-        LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
-        baseTableMaps.put(TB_PARTITION_METHODS, true);
-      }
-
-      // PARTITIONS
-      if (!baseTableMaps.get(TB_PARTTIONS)) {
-        String sql = readSchemaFile("partitions.sql");
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE INDEX PARTITIONS_IDX_TID on PARTITIONS (TID)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        sql = "CREATE UNIQUE INDEX IDX_TID_NAME on PARTITIONS (TID, 
PARTITION_NAME)";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.addBatch(sql);
-        
-        stmt.executeBatch();
-        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
-        baseTableMaps.put(TB_PARTTIONS, true);
-      }
-
-      insertSchemaVersion();
-
-    } catch (SQLException se) {
-      throw new CatalogException("failed to create base tables for PostgreSQL 
catalog store", se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-    }
-  }
-
-  @Override
-  protected void dropBaseTable() throws CatalogException {
-    Connection conn;
-    Statement stmt = null;
-    Map<String, Boolean> droppedTable = new HashMap<String, Boolean>();
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-      for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
-        if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) {
-          String sql = "DROP TABLE " + entry.getKey();
-          stmt.addBatch(sql);
-          droppedTable.put(entry.getKey(), true);
-        }
-      }
-      if(baseTableMaps.get(TB_TABLES)) {
-        String sql = "DROP TABLE " + TB_TABLES;
-        stmt.addBatch(sql);
-        droppedTable.put(TB_TABLES, true);
-      }
-      stmt.executeBatch();
-
-      for(String tableName : droppedTable.keySet()) {
-        LOG.info("Table '" + tableName + "' is dropped");
-      }
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(stmt);
-    }
-  }
-
-  @Override
-  public int getDriverVersion() {
-    return POSTGRESQL_STORE_VERSION;
-  }
-
-  @Override
-  public String readSchemaFile(String path) throws CatalogException {
-    return super.readSchemaFile("postgresql/" + path);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java
new file mode 100644
index 0000000..b09bfa8
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java
@@ -0,0 +1,698 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.exception.CatalogException;
+import org.apache.tajo.catalog.store.object.*;
+
+public class XMLCatalogSchemaManager {
+  protected final Log LOG = LogFactory.getLog(getClass());
+  private String schemaPath;
+  private final Unmarshaller unmarshaller;
+  private StoreObject catalogStore;
+  
+  public XMLCatalogSchemaManager(String schemaPath) throws CatalogException {
+    this.schemaPath = schemaPath;
+    try {
+      JAXBContext context = JAXBContext.newInstance(StoreObject.class);
+      unmarshaller = context.createUnmarshaller();
+      
+      loadFromXmlFiles();
+    } catch (Exception e) {
+      throw new CatalogException(e.getMessage(), e);
+    }
+  }
+  
+  protected String getDropSQL(DatabaseObjectType type, String name) 
+      throws CatalogException {
+    SQLObject foundDropQuery = null;
+    String sqlStatement = "DROP " + type.toString() + " " + name;
+    
+    for (SQLObject dropQuery: catalogStore.getDropStatements()) {
+      if (type == dropQuery.getType()) {
+        foundDropQuery = dropQuery;
+        break;
+      }
+    }
+    
+    if (foundDropQuery != null && foundDropQuery.getSql() != null && 
!foundDropQuery.getSql().isEmpty()) {
+      String dropStatement = foundDropQuery.getSql();
+      StringBuffer sqlBuffer = new 
StringBuffer(dropStatement.length()+name.length());
+      int identifier = dropStatement.indexOf('?');
+      
+      sqlBuffer.append(dropStatement.substring(0, identifier)).append(name)
+        .append(dropStatement.substring(identifier+1));
+      sqlStatement = sqlBuffer.toString();
+    }
+    
+    return sqlStatement;
+  }
+
+  public void dropBaseSchema(Connection conn) throws CatalogException {
+    if (!isLoaded()) {
+      throw new CatalogException("Schema files are not loaded yet.");
+    }
+    
+    List<DatabaseObject> failedObjects = new ArrayList<DatabaseObject>();
+    Statement stmt = null;
+    
+    try {
+      stmt = conn.createStatement();
+    } catch (SQLException e) {
+      throw new CatalogException(e.getMessage(), e);
+    }
+    
+    for (DatabaseObject object: catalogStore.getSchema().getObjects()) {
+      try {
+        if (DatabaseObjectType.TABLE == object.getType() ||
+            DatabaseObjectType.SEQUENCE == object.getType() ||
+            DatabaseObjectType.VIEW == object.getType()) {
+          stmt.executeUpdate(getDropSQL(object.getType(), object.getName()));
+        }
+      } catch (SQLException e) {
+        failedObjects.add(object);
+      }
+    }
+    
+    if (failedObjects.size() > 0) {
+      StringBuffer errorMessage = new StringBuffer(64);
+      errorMessage.append("Failed to drop database objects ");
+      
+      for (int idx = 0; idx < failedObjects.size(); idx++) {
+        DatabaseObject object = failedObjects.get(idx);
+        errorMessage.append(object.getType().toString()).append(" ")
+          .append(object.getName());
+        if ((idx+1) < failedObjects.size()) {
+          errorMessage.append(',');
+        }
+      }
+      
+      LOG.warn(errorMessage.toString());
+    }
+  }
+  
+  protected PreparedStatement getExistQuery(Connection conn, 
DatabaseObjectType type) 
+      throws SQLException {
+    PreparedStatement pstmt = null;
+    
+    for (SQLObject existQuery: catalogStore.getExistQueries()) {
+      if (type.equals(existQuery.getType())) {
+        pstmt = conn.prepareStatement(existQuery.getSql());
+        break;
+      }
+    }
+    
+    return pstmt;
+  }
+  
+  protected boolean checkExistance(Connection conn, DatabaseObjectType type, 
String... params) 
+      throws SQLException {
+    boolean result = false;
+    DatabaseMetaData metadata = null;
+    PreparedStatement pstmt = null;
+    BaseSchema baseSchema = catalogStore.getSchema();
+    
+    if (params == null || params.length < 1) {
+      throw new IllegalArgumentException("checkExistance function needs at 
least one argument.");
+    }
+    
+    switch(type) {
+    case DATA:
+      metadata = conn.getMetaData();
+      ResultSet data = metadata.getUDTs(null, baseSchema.getSchemaName() != 
null && !baseSchema.getSchemaName().isEmpty()?
+          baseSchema.getSchemaName().toUpperCase():null, 
+          params[0].toUpperCase(), null);
+      result = data.next();
+      CatalogUtil.closeQuietly(data);
+      break;
+    case FUNCTION:
+      metadata = conn.getMetaData();
+      ResultSet functions = metadata.getFunctions(null, 
baseSchema.getSchemaName() != null && !baseSchema.getSchemaName().isEmpty()?
+          baseSchema.getSchemaName().toUpperCase():null,
+          params[0].toUpperCase());
+      result = functions.next();
+      CatalogUtil.closeQuietly(functions);
+      break;
+    case INDEX:
+      if (params.length != 2) {
+        throw new IllegalArgumentException("Finding index object is needed two 
strings, table name and index name");
+      }
+      
+      pstmt = getExistQuery(conn, type);
+      if (pstmt != null) {
+        result = checkExistanceByQuery(pstmt, baseSchema, params);
+      } else {
+        metadata = conn.getMetaData();
+        ResultSet indexes = metadata.getIndexInfo(null, 
baseSchema.getSchemaName() != null
+            && !baseSchema.getSchemaName().isEmpty() ? 
baseSchema.getSchemaName().toUpperCase() : null,
+            params[0].toUpperCase(), false, true);
+        while (indexes.next()) {
+          if (indexes.getString("INDEX_NAME").equals(params[1].toUpperCase())) 
{
+            result = true;
+            break;
+          }
+        }
+        CatalogUtil.closeQuietly(indexes);
+      }
+      break;
+    case TABLE:
+      pstmt = getExistQuery(conn, type);
+      if (pstmt != null) {
+        result = checkExistanceByQuery(pstmt, baseSchema, params);
+      } else {
+        metadata = conn.getMetaData();
+        ResultSet tables = metadata.getTables(null, baseSchema.getSchemaName() 
!= null
+            && !baseSchema.getSchemaName().isEmpty() ? 
baseSchema.getSchemaName().toUpperCase() : null,
+            params[0].toUpperCase(), new String[] { "TABLE" });
+        result = tables.next();
+        CatalogUtil.closeQuietly(tables);
+      }
+      break;
+    case DOMAIN:
+    case OPERATOR:
+    case RULE:
+    case SEQUENCE:
+    case TRIGGER:
+    case VIEW:
+      pstmt = getExistQuery(conn, type);
+      
+      if (pstmt == null) {
+        throw new CatalogException("Finding " + type
+            + " type of database object is not supported on this database 
system.");
+      }
+      
+      result = checkExistanceByQuery(pstmt, baseSchema, params);
+      break;
+    }
+    
+    return result;
+  }
+
+  private boolean checkExistanceByQuery(PreparedStatement pstmt, BaseSchema 
baseSchema,
+      String... params) throws SQLException {
+    int paramIdx = 1;
+    boolean result = false;
+    
+    if (baseSchema.getSchemaName() != null && 
!baseSchema.getSchemaName().isEmpty()) {
+      pstmt.setString(paramIdx++, baseSchema.getSchemaName().toUpperCase());
+    }
+    
+    for (; paramIdx <= pstmt.getParameterMetaData().getParameterCount(); 
paramIdx++) {
+      pstmt.setString(paramIdx, params[paramIdx-1]);
+    }
+    
+    ResultSet rs = pstmt.executeQuery();
+    while (rs.next()) {
+      if (rs.getString(1).equals(params[params.length-1].toUpperCase())) {
+        result = true;
+        break;
+      }
+    }
+    CatalogUtil.closeQuietly(rs);
+    return result;
+  }
+
+  public void createBaseSchema(Connection conn) throws CatalogException {
+    Statement stmt;
+    
+    if (!isLoaded()) {
+      throw new CatalogException("Database schema files are not loaded.");
+    }
+    
+    try {
+      stmt = conn.createStatement();
+    } catch (SQLException e) {
+      throw new CatalogException(e.getMessage(), e);
+    }
+    
+    for (DatabaseObject object: catalogStore.getSchema().getObjects()) {
+      try {
+        String[] params;
+        if (DatabaseObjectType.INDEX == object.getType()) {
+          params = new String[2];
+          params[0] = object.getDependsOn();
+          params[1] = object.getName();
+        }
+        else {
+          params = new String[1];
+          params[0] = object.getName();
+        }
+        
+        if (checkExistance(conn, object.getType(), params)) {
+          LOG.info("Skip to create " + object.getName() + " databse object. 
Already exists.");
+        }
+        else {
+          stmt.executeUpdate(object.getSql());
+          LOG.info(object.getName() + " " + object.getType() + " is created.");
+        }
+      } catch (SQLException e) {
+        throw new CatalogException(e.getMessage(), e);
+      }
+    }
+  }
+  
+  public void upgradeBaseSchema(Connection conn, int currentVersion) {
+    if (!isLoaded()) {
+      throw new CatalogException("Database schema files are not loaded.");
+    }
+    
+    final List<SchemaPatch> candidatePatches = new ArrayList<SchemaPatch>();
+    Statement stmt;
+    
+    for (SchemaPatch patch: this.catalogStore.getPatches()) {
+      if (currentVersion >= patch.getPriorVersion()) {
+        candidatePatches.add(patch);
+      }
+    }
+    
+    Collections.sort(candidatePatches);
+    try {
+      stmt = conn.createStatement();
+    } catch (SQLException e) {
+      throw new CatalogException(e.getMessage(), e);
+    }
+    
+    for (SchemaPatch patch: candidatePatches) {
+      for (DatabaseObject object: patch.getObjects()) {
+        try {
+          stmt.executeUpdate(object.getSql());
+          LOG.info(object.getName() + " " + object.getType() + " was created 
or altered.");
+        } catch (SQLException e) {
+          throw new CatalogException(e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  public boolean isInitialized(Connection conn) throws CatalogException {
+    if (!isLoaded()) {
+      throw new CatalogException("Database schema files are not loaded.");
+    }
+    
+    boolean result = true;
+    
+    for (DatabaseObject object: catalogStore.getSchema().getObjects()) {
+      try {
+        if (DatabaseObjectType.INDEX == object.getType()) {
+          result &= checkExistance(conn, object.getType(), 
object.getDependsOn(), object.getName());
+        }
+        else {
+          result &= checkExistance(conn, object.getType(), object.getName());
+        }
+      } catch (SQLException e) {
+        throw new CatalogException(e.getMessage(), e);
+      }
+      
+      if (!result) {
+        break;
+      }
+    }
+    return result;
+  }
+  
+  protected String[] listFileResources(URL dirURL, String schemaPath, 
FilenameFilter filter) 
+      throws URISyntaxException, IOException {
+    String[] files = new String[0];
+    String[] tempFiles;
+    List<String> filesList = new ArrayList<String>();
+    File dirFile = new File(dirURL.toURI());
+    
+    tempFiles = dirFile.list(filter);
+    files = new String[tempFiles.length];
+    
+    for (String fileName: tempFiles) {
+      filesList.add(schemaPath+"/"+fileName);
+    }
+    
+    files = filesList.toArray(files);
+    return files;
+  }
+  
+  protected String[] listJarResources(URL dirURL, FilenameFilter filter) 
+      throws IOException, URISyntaxException {
+    String[] files = new String[0];
+    String spec = dirURL.getFile();
+    int seperator = spec.indexOf("!/");
+    
+    if (seperator == -1) {
+      return files;
+    }
+
+    URL jarFileURL = new URL(spec.substring(0, seperator));
+    JarFile jarFile = new JarFile(jarFileURL.toURI().getPath());
+    Set<String> filesSet = new HashSet<String>();
+    
+    try {
+      Enumeration<JarEntry> entries = jarFile.entries();
+      
+      while (entries.hasMoreElements()) {
+        JarEntry entry = entries.nextElement();
+        
+        if (entry.isDirectory()) {
+          continue;
+        }
+        
+        String entryName = entry.getName();
+        
+        if (entryName.indexOf(schemaPath) > -1 &&
+            filter.accept(null, entryName)) {
+          filesSet.add(entryName);
+        }
+      }
+    } finally {
+      jarFile.close();
+    }
+    
+    if (!filesSet.isEmpty()) {
+      files = new String[filesSet.size()];
+      files = filesSet.toArray(files);
+    }
+    
+    return files;
+  }
+  
+  protected String[] listResources() throws IOException, URISyntaxException {
+    String[] files = new String[0];
+    URL dirURL = ClassLoader.getSystemResource(schemaPath);
+    FilenameFilter fileFilter = new FilenameFilter() {
+      
+      @Override
+      public boolean accept(File dir, String name) {
+        return ((name.lastIndexOf('.') > -1) && 
+            (".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.')))));
+      }
+    };
+    
+    if (dirURL == null) {
+      throw new FileNotFoundException(schemaPath);
+    }
+    
+    if (dirURL.getProtocol().equals("file")) {
+      files = listFileResources(dirURL, schemaPath, fileFilter);
+    }
+    
+    if (dirURL.getProtocol().equals("jar")) {
+      files = listJarResources(dirURL, fileFilter);
+    }
+    
+    return files;
+  }
+  
+  protected void mergeXmlSchemas(final List<StoreObject> storeObjects) throws 
CatalogException {
+    if (storeObjects.size() <= 0) {
+      throw new CatalogException("Unable to find a schema file.");
+    }
+    
+    this.catalogStore = new StoreObjectsMerger(storeObjects).merge();
+  }
+  
+  protected void loadFromXmlFiles() throws IOException, XMLStreamException, 
URISyntaxException {
+    XMLInputFactory xmlIf = XMLInputFactory.newInstance();
+    final List<StoreObject> storeObjects = new ArrayList<StoreObject>();
+    
+    xmlIf.setProperty(XMLInputFactory.IS_COALESCING, Boolean.TRUE);
+    
+    for (String resname: listResources()) {
+      URL filePath = ClassLoader.getSystemResource(resname);
+      
+      if (filePath == null) {
+        throw new FileNotFoundException(resname);
+      }
+      
+      loadFromXmlFile(xmlIf, filePath, storeObjects);
+    }
+    
+    mergeXmlSchemas(storeObjects);
+  }
+  
+  protected void loadFromXmlFile(XMLInputFactory xmlIf, URL filePath, 
List<StoreObject> storeObjects) 
+      throws IOException, XMLStreamException {
+    XMLStreamReader xmlReader;
+
+    xmlReader = xmlIf.createXMLStreamReader(filePath.openStream());
+    
+    try {
+      while (xmlReader.hasNext()) {
+        if (xmlReader.next() == XMLStreamConstants.START_ELEMENT &&
+            "store".equals(xmlReader.getLocalName())) {
+          StoreObject catalogStore = loadCatalogStore(xmlReader);
+          if (catalogStore != null) {
+            storeObjects.add(catalogStore);
+          }
+        }
+      }
+    } finally {
+      try {
+        xmlReader.close();
+      } catch (XMLStreamException ignored) { }
+    }
+  }
+  
+  protected StoreObject loadCatalogStore(XMLStreamReader xmlReader) throws 
XMLStreamException {
+    try {
+      JAXBElement<StoreObject> elem = unmarshaller.unmarshal(xmlReader, 
StoreObject.class);
+      return elem.getValue();
+    } catch (JAXBException e) {
+      throw new XMLStreamException(e.getMessage(), xmlReader.getLocation(), e);
+    }
+  }
+  
+  protected StoreObject getCatalogStore() {
+    return catalogStore;
+  }
+  
+  public boolean isLoaded() {
+    return catalogStore!=null;
+  }
+  
+  private class StoreObjectsMerger {
+    
+    private final List<StoreObject> storeObjects = new 
ArrayList<StoreObject>();
+    private final StoreObject targetStore = new StoreObject();
+    
+    public StoreObjectsMerger(List<StoreObject> schemaObjects) {
+      this.storeObjects.addAll(schemaObjects);
+    }
+    
+    protected void copySchemaInfo(StoreObject sourceStore) {
+      if (sourceStore.getSchema().getSchemaName() != null && 
+          !sourceStore.getSchema().getSchemaName().isEmpty()) {
+        if (targetStore.getSchema().getSchemaName() != null && 
+            !targetStore.getSchema().getSchemaName().isEmpty() &&
+            !targetStore.getSchema().getSchemaName().equalsIgnoreCase(
+                sourceStore.getSchema().getSchemaName())) {
+          throw new CatalogException("different schema names are specified. 
One is " + 
+            sourceStore.getSchema().getSchemaName() + " and other is " + 
+              targetStore.getSchema().getSchemaName());
+        }
+        
+        if (targetStore.getSchema().getSchemaName() == null || 
+            targetStore.getSchema().getSchemaName().isEmpty()) {
+          
targetStore.getSchema().setSchemaName(sourceStore.getSchema().getSchemaName());
+        }
+      }
+      
+      if (sourceStore.getSchema().getVersion() > -1 && 
+          targetStore.getSchema().getVersion() < 
sourceStore.getSchema().getVersion()) {
+        
targetStore.getSchema().setVersion(sourceStore.getSchema().getVersion());
+      }
+    }
+    
+    private List<DatabaseObject> createListAndFillNull(int maxIdx) {
+      DatabaseObject[] objects = new DatabaseObject[maxIdx];
+      Arrays.fill(objects, null);
+      return new ArrayList<DatabaseObject>(Arrays.asList(objects));
+    }
+    
+    protected List<DatabaseObject> mergeDatabaseObjects(List<DatabaseObject> 
objects) {
+      int maxIdx = -1;
+      
+      for (DatabaseObject object: objects) {
+        maxIdx = Math.max(maxIdx, object.getOrder());
+      }
+      
+      final List<DatabaseObject> orderedObjects = 
createListAndFillNull(maxIdx);
+      final List<DatabaseObject> unorderedObjects = new 
ArrayList<DatabaseObject>();
+      final List<DatabaseObject> mergedObjects = new 
ArrayList<DatabaseObject>();
+      
+      for (DatabaseObject object: objects) {
+        if (object.getOrder() > -1) {
+          int objIdx = object.getOrder();
+          
+          if (objIdx < orderedObjects.size() && orderedObjects.get(objIdx) != 
null) {
+            throw new CatalogException("This catalog configuration contains 
duplicated order of DatabaseObject");
+          }
+          
+          orderedObjects.add(objIdx, object);
+        }
+        else {
+          unorderedObjects.add(object);
+        }
+      }
+      
+      for (DatabaseObject object: orderedObjects) {
+        if (object != null) {
+          mergedObjects.add(object);
+        }
+      }
+      
+      for (DatabaseObject object: unorderedObjects) {
+        if (object != null) {
+          mergedObjects.add(object);
+        }
+      }
+      
+      return mergedObjects;
+    }
+    
+    protected void validatePatch(List<SchemaPatch> patches, SchemaPatch 
testPatch) {
+      if (testPatch.getPriorVersion() > testPatch.getNextVersion()) {
+        throw new CatalogException("Prior version cannot proceed to next 
version of patch.");
+      }
+      
+      for (SchemaPatch patch: patches) {
+        if (testPatch.equals(patch)) {
+          continue;
+        }
+        
+        if (testPatch.getPriorVersion() == patch.getPriorVersion()) {
+          LOG.warn("It has the same prior version (" + 
testPatch.getPriorVersion() + ") of patch.");
+          
+          if (testPatch.getNextVersion() == patch.getNextVersion()) {
+            throw new CatalogException("Duplicate versions of patch found. It 
will terminate Catalog Store. ");
+          }
+        }
+        
+        if (testPatch.getNextVersion() == patch.getNextVersion()) {
+          LOG.warn("It has the same next version (" + 
testPatch.getPriorVersion() + ") of patch.");
+        }
+      }
+    }
+    
+    protected void mergePatches(List<SchemaPatch> patches) {
+      final List<DatabaseObject> objects = new ArrayList<DatabaseObject>();
+      
+      Collections.sort(patches);
+      
+      for (SchemaPatch patch: patches) {
+        validatePatch(patches, patch);
+        
+        objects.clear();
+        List<DatabaseObject> tempObjects = new ArrayList<DatabaseObject>();
+        tempObjects.addAll(patch.getObjects());
+        patch.clearObjects();
+        patch.addObjects(mergeDatabaseObjects(tempObjects));        
+        
+        targetStore.addPatch(patch);
+      }
+    }
+    
+    protected void validateSQLObject(List<SQLObject> queries, SQLObject 
testQuery) {
+      int occurredCount = 0;
+      
+      for (SQLObject query: queries) {
+        if (query.getType() == testQuery.getType()) {
+          occurredCount++;
+        }
+      }
+      
+      if (occurredCount > 1) {
+        throw new CatalogException("Duplicate Query type (" + 
testQuery.getType() + ") has found.");
+      }
+    }
+    
+    protected void mergeExistQueries(List<SQLObject> queries) {
+      for (SQLObject query: queries) {
+        validateSQLObject(queries, query);
+        
+        targetStore.addExistQuery(query);
+      }
+    }
+    
+    protected void mergeDropStatements(List<SQLObject> queries) {
+      for (SQLObject query: queries) {
+        validateSQLObject(queries, query);
+        
+        targetStore.addDropStatement(query);
+      }
+    }
+    
+    public StoreObject merge() {
+      boolean alreadySetDatabaseObject = false;
+      
+      // first pass
+      for (StoreObject store : this.storeObjects) {
+        copySchemaInfo(store);
+      }
+      
+      // second pass
+      for (StoreObject store: this.storeObjects) {
+        if (store.getSchema().getVersion() == 
targetStore.getSchema().getVersion() && 
+            !alreadySetDatabaseObject) {
+          BaseSchema targetSchema = targetStore.getSchema();
+          targetSchema.clearObjects();
+          
targetSchema.addObjects(mergeDatabaseObjects(store.getSchema().getObjects()));
+          
+          alreadySetDatabaseObject = true;
+        }
+        
+        mergePatches(store.getPatches());
+        mergeExistQueries(store.getExistQueries());
+        mergeDropStatements(store.getDropStatements());
+      }
+      
+      return this.targetStore;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java
new file mode 100644
index 0000000..3c9a3e2
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BaseSchema implements Comparable<BaseSchema> {
+
+  @XmlAttribute(name="version",required=true)
+  private int version;
+  @XmlAttribute(name="schemaname")
+  private String schemaName;
+  
@XmlElementWrapper(name="objects",namespace="http://tajo.apache.org/catalogstore";)
+  @XmlElement(name="Object",namespace="http://tajo.apache.org/catalogstore";)
+  private final List<DatabaseObject> objects = new ArrayList<DatabaseObject>();
+  
+  public int getVersion() {
+    return version;
+  }
+  public void setVersion(int version) {
+    this.version = version;
+  }
+  public String getSchemaName() {
+    return schemaName;
+  }
+  public void setSchemaName(String schemaName) {
+    this.schemaName = schemaName;
+  }
+  public List<DatabaseObject> getObjects() {
+    return objects;
+  }
+  public void addObject(DatabaseObject object) {
+    this.objects.add(object);
+  }
+  public void addObjects(Collection<DatabaseObject> objects) {
+    this.objects.addAll(objects);
+  }
+  public void clearObjects() {
+    this.objects.clear();
+  }
+  
+  @Override
+  public int compareTo(BaseSchema o) {
+    return (int) Math.signum(getVersion()-o.getVersion());
+  }
+  @Override
+  public String toString() {
+    return "BaseSchema [version=" + version + ", schemaName=" + schemaName + 
", objects=" + objects + "]";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java
new file mode 100644
index 0000000..4132018
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+
+@XmlAccessorType(XmlAccessType.FIELD)
+public class DatabaseObject implements Comparable<DatabaseObject> {
+  
+  @XmlAttribute(name="order")
+  private int order=-1;
+  @XmlAttribute(name="type",required=true)
+  private DatabaseObjectType type;
+  @XmlAttribute(name="name",required=true)
+  private String name;
+  @XmlAttribute(name="dependsOn")
+  private String dependsOn;
+  @XmlElement(name="sql",namespace="http://tajo.apache.org/catalogstore";)
+  private String sql;
+  
+  public int getOrder() {
+    return order;
+  }
+  public void setOrder(int order) {
+    this.order = order;
+  }
+  public DatabaseObjectType getType() {
+    return type;
+  }
+  public void setType(DatabaseObjectType type) {
+    this.type = type;
+  }
+  public String getName() {
+    return name;
+  }
+  public void setName(String name) {
+    this.name = name;
+  }
+  public String getDependsOn() {
+    return dependsOn;
+  }
+  public void setDependsOn(String dependsOn) {
+    this.dependsOn = dependsOn;
+  }
+  public String getSql() {
+    return sql;
+  }
+  public void setSql(String sqlStatement) {
+    this.sql = sqlStatement;
+  }
+  @Override
+  public String toString() {
+    return "DatabaseObject [order=" + order + ", type=" + type + ", name=" + 
name + ", dependsOn=" + dependsOn
+        + ", sql=" + sql + "]";
+  }
+  @Override
+  public int compareTo(DatabaseObject o) {
+    return (int) Math.signum(getOrder() - o.getOrder());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java
new file mode 100644
index 0000000..b41dfb9
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlEnumValue;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name="DatabaseObjectsType")
+@XmlEnum(String.class)
+public enum DatabaseObjectType {
+  @XmlEnumValue("table")
+  TABLE,
+  @XmlEnumValue("view")
+  VIEW,
+  @XmlEnumValue("function")
+  FUNCTION,
+  @XmlEnumValue("operator")
+  OPERATOR,
+  @XmlEnumValue("data")
+  DATA,
+  @XmlEnumValue("domain")
+  DOMAIN,
+  @XmlEnumValue("trigger")
+  TRIGGER,
+  @XmlEnumValue("rule")
+  RULE,
+  @XmlEnumValue("sequence")
+  SEQUENCE,
+  @XmlEnumValue("index")
+  INDEX;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java
new file mode 100644
index 0000000..e22dc73
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SQLObject {
+
+  @XmlAttribute
+  private DatabaseObjectType type;
+  @XmlElement(name="sql",namespace="http://tajo.apache.org/catalogstore";)
+  private String sql;
+  
+  public DatabaseObjectType getType() {
+    return type;
+  }
+  public void setType(DatabaseObjectType type) {
+    this.type = type;
+  }
+  public String getSql() {
+    return sql;
+  }
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+  
+  @Override
+  public String toString() {
+    return "ExistQuery [type=" + type + ", sql=" + sql + "]";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java
new file mode 100644
index 0000000..3859d03
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SchemaPatch implements Comparable<SchemaPatch> {
+
+  @XmlAttribute
+  private int priorVersion;
+  @XmlAttribute
+  private int nextVersion;
+  
@XmlElementWrapper(name="objects",namespace="http://tajo.apache.org/catalogstore";)
+  @XmlElement(name="Object",namespace="http://tajo.apache.org/catalogstore";)
+  private final List<DatabaseObject> objects = new ArrayList<DatabaseObject>();
+
+  public int getPriorVersion() {
+    return priorVersion;
+  }
+  public void setPriorVersion(int priorVersion) {
+    this.priorVersion = priorVersion;
+  }
+  public int getNextVersion() {
+    return nextVersion;
+  }
+  public void setNextVersion(int nextVersion) {
+    this.nextVersion = nextVersion;
+  }
+  public List<DatabaseObject> getObjects() {
+    return objects;
+  }
+  public void addObject(DatabaseObject object) {
+    this.objects.add(object);
+  }
+  public void addObjects(List<DatabaseObject> objects) {
+    this.objects.addAll(objects);
+  }
+  public void clearObjects() {
+    this.objects.clear();
+  }
+  @Override
+  public int compareTo(SchemaPatch o) {
+    int result = (int) Math.signum(getPriorVersion()-o.getPriorVersion());
+    if (result == 0) {
+      result = (int) Math.signum(getNextVersion()-o.getNextVersion());
+    }
+    return result;
+  }
+  @Override
+  public String toString() {
+    return "SchemaPatch [priorVersion=" + priorVersion + ", nextVersion=" + 
nextVersion + ", objects=" + objects + "]";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java
 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java
new file mode 100644
index 0000000..c83ee3b
--- /dev/null
+++ 
b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store.object;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlElementWrapper;
+
+@XmlAccessorType(XmlAccessType.FIELD)
+public class StoreObject {
+
+  @XmlElement(name="base",namespace="http://tajo.apache.org/catalogstore";)
+  private BaseSchema schema = new BaseSchema();
+  
@XmlElementWrapper(name="existQueries",namespace="http://tajo.apache.org/catalogstore";)
+  
@XmlElement(name="existQuery",namespace="http://tajo.apache.org/catalogstore";)
+  private final List<SQLObject> existQueries = new ArrayList<SQLObject>();
+  
@XmlElementWrapper(name="dropStatements",namespace="http://tajo.apache.org/catalogstore";)
+  
@XmlElement(name="dropStatement",namespace="http://tajo.apache.org/catalogstore";)
+  private final List<SQLObject> dropStatements = new ArrayList<SQLObject>();
+  
@XmlElementWrapper(name="patches",namespace="http://tajo.apache.org/catalogstore";)
+  @XmlElement(name="patch",namespace="http://tajo.apache.org/catalogstore";)
+  private final List<SchemaPatch> patches = new ArrayList<SchemaPatch>();
+  
+  public BaseSchema getSchema() {
+    return schema;
+  }
+  public void setSchema(BaseSchema schema) {
+    this.schema = schema;
+  }
+  public List<SQLObject> getExistQueries() {
+    return existQueries;
+  }
+  public void addExistQuery(SQLObject query) {
+    this.existQueries.add(query);
+  }
+  public void addExistQueries(Collection<SQLObject> queries) {
+    this.existQueries.addAll(queries);
+  }
+  public void clearExistQueries() {
+    this.existQueries.clear();
+  }
+  public List<SQLObject> getDropStatements() {
+    return dropStatements;
+  }
+  public void addDropStatement(SQLObject query) {
+    this.dropStatements.add(query);
+  }
+  public void addDropStatements(Collection<SQLObject> queries) {
+    this.dropStatements.addAll(queries);
+  }
+  public void clearDropStatements() {
+    this.dropStatements.clear();
+  }
+  public List<SchemaPatch> getPatches() {
+    return patches;
+  }
+  public void addPatch(SchemaPatch patch) {
+    this.patches.add(patch);
+  }
+  public void addPatches(Collection<SchemaPatch> patches) {
+    this.patches.addAll(patches);
+  }
+  public void clearPatches() {
+    this.patches.clear();
+  }
+}

Reply via email to