TAJO-1109: Separate SQL Statements from Catalog Stores. (Jihun Kang via hyunsik)
Closes #201 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/00555685 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/00555685 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/00555685 Branch: refs/heads/hbase_storage Commit: 005556850e60f14d5e11ca74035b195461f31d29 Parents: 2b8d30c Author: Hyunsik Choi <[email protected]> Authored: Tue Nov 11 14:16:19 2014 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Tue Nov 11 14:16:19 2014 -0800 ---------------------------------------------------------------------- CHANGES | 3 + tajo-catalog/tajo-catalog-server/pom.xml | 6 + .../tajo/catalog/store/AbstractDBStore.java | 96 ++- .../apache/tajo/catalog/store/DerbyStore.java | 330 +-------- .../apache/tajo/catalog/store/OracleStore.java | 337 +-------- .../tajo/catalog/store/PostgreSQLStore.java | 293 +------- .../catalog/store/XMLCatalogSchemaManager.java | 698 +++++++++++++++++++ .../tajo/catalog/store/object/BaseSchema.java | 76 ++ .../catalog/store/object/DatabaseObject.java | 80 +++ .../store/object/DatabaseObjectType.java | 48 ++ .../tajo/catalog/store/object/SQLObject.java | 52 ++ .../tajo/catalog/store/object/SchemaPatch.java | 78 +++ .../tajo/catalog/store/object/StoreObject.java | 87 +++ .../resources/schemas/DBMSSchemaDefinition.xsd | 177 +++++ .../main/resources/schemas/derby/columns.sql | 8 - .../main/resources/schemas/derby/databases.sql | 6 - .../resources/schemas/derby/databases_idx.sql | 1 - .../src/main/resources/schemas/derby/derby.xml | 186 +++++ .../main/resources/schemas/derby/indexes.sql | 12 - .../schemas/derby/partition_methods.sql | 6 - .../main/resources/schemas/derby/partitions.sql | 10 - .../src/main/resources/schemas/derby/stats.sql | 6 - .../schemas/derby/table_properties.sql | 6 - .../src/main/resources/schemas/derby/tables.sql | 10 - .../resources/schemas/derby/tablespaces.sql | 7 - .../main/resources/schemas/oracle/oracle.xml | 218 ++++++ .../resources/schemas/postgresql/indexes.sql | 14 - .../resources/schemas/postgresql/postgresql.xml | 203 ++++++ .../main/resources/schemas/postgresql/stats.sql | 6 - .../store/TestXMLCatalogSchemaManager.java | 496 +++++++++++++ .../schemas/derbytest/loadtest/derby.xml | 191 +++++ .../derbytest/mergetest/base_version_1.xml | 35 + .../derbytest/mergetest/base_version_2.xml | 63 ++ .../schemas/derbytest/querytest/derby.xml | 78 +++ .../derbytest/upgradetest/base_version_2.xml | 57 ++ 35 files changed, 2934 insertions(+), 1046 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index d71905b..1337f87 100644 --- a/CHANGES +++ b/CHANGES @@ -13,6 +13,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1109: Separate SQL Statements from Catalog Stores. + (Jihun Kang via hyunsik) + TAJO-1161: Remove joda time dependency from tajo-core. (Jihun Kang via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml index f811150..a2801e6 100644 --- a/tajo-catalog/tajo-catalog-server/pom.xml +++ b/tajo-catalog/tajo-catalog-server/pom.xml @@ -171,6 +171,12 @@ <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-library</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index bcf6774..7c1baab 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -22,6 +22,7 @@ package org.apache.tajo.catalog.store; import com.google.protobuf.InvalidProtocolBufferException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,7 +36,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.exception.InternalException; import org.apache.tajo.exception.UnimplementedException; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.Pair; @@ -58,21 +58,40 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo protected final String catalogUri; private Connection conn; - + protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>(); + + protected XMLCatalogSchemaManager catalogSchemaManager; protected abstract String getCatalogDriverName(); + + protected String getCatalogSchemaPath() { + return ""; + } protected abstract Connection createConnection(final Configuration conf) throws SQLException; + + protected void createDatabaseDependants() throws CatalogException { + + } + + protected boolean isInitialized() throws CatalogException { + return catalogSchemaManager.isInitialized(getConnection()); + } - protected abstract boolean isInitialized() throws CatalogException; - - protected abstract void createBaseTable() throws CatalogException; + protected void createBaseTable() throws CatalogException { + createDatabaseDependants(); + + catalogSchemaManager.createBaseSchema(getConnection()); + + insertSchemaVersion(); + } - protected abstract void dropBaseTable() throws CatalogException; + protected void dropBaseTable() throws CatalogException { + catalogSchemaManager.dropBaseSchema(getConnection()); + } public AbstractDBStore(Configuration conf) throws InternalException { - this.conf = conf; if (conf.get(CatalogConstants.DEPRECATED_CATALOG_URI) != null) { @@ -116,6 +135,11 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo + ")", e); } + String schemaPath = getCatalogSchemaPath(); + if (schemaPath != null && !schemaPath.isEmpty()) { + this.catalogSchemaManager = new XMLCatalogSchemaManager(schemaPath); + } + try { if (isInitialized()) { LOG.info("The base tables of CatalogServer already is initialized."); @@ -138,7 +162,9 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo } } - public abstract int getDriverVersion(); + public int getDriverVersion() { + return catalogSchemaManager.getCatalogStore().getSchema().getVersion(); + } public String readSchemaFile(String path) throws CatalogException { try { @@ -176,42 +202,52 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo return conn; } - private void verifySchemaVersion() throws CatalogException { + private int getSchemaVersion() { Connection conn = null; PreparedStatement pstmt = null; ResultSet result = null; + int schemaVersion = -1; + + String sql = "SELECT version FROM META"; + if (LOG.isDebugEnabled()) { + LOG.debug(sql.toString()); + } try { - String sql = "SELECT version FROM META"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql.toString()); - } - conn = getConnection(); pstmt = conn.prepareStatement(sql); result = pstmt.executeQuery(); - boolean noVersion = !result.next(); - - int schemaVersion = result.getInt(1); - if (noVersion || schemaVersion != getDriverVersion()) { - LOG.error(String.format("Catalog version (%d) and current driver version (%d) are mismatch to each other", - schemaVersion, getDriverVersion())); - LOG.error("========================================================================="); - LOG.error("| Catalog Store Migration Is Needed |"); - LOG.error("========================================================================="); - LOG.error("| You might downgrade or upgrade Apache Tajo. Downgrading or upgrading |"); - LOG.error("| Tajo without migration process is only available in some versions. |"); - LOG.error("| In order to learn how to migration Apache Tajo instance, |"); - LOG.error("| please refer http://s.apache.org/0_8_migration. |"); - LOG.error("========================================================================="); - throw new CatalogException("Migration Needed. Please refer http://s.apache.org/0_8_migration."); + if (result.next()) { + schemaVersion = result.getInt("VERSION"); } } catch (SQLException e) { - throw new CatalogException(e); + throw new CatalogException(e.getMessage(), e); } finally { CatalogUtil.closeQuietly(pstmt, result); } + + return schemaVersion; + } + + private void verifySchemaVersion() throws CatalogException { + int schemaVersion = -1; + + schemaVersion = getSchemaVersion(); + + if (schemaVersion == -1 || schemaVersion != getDriverVersion()) { + LOG.error(String.format("Catalog version (%d) and current driver version (%d) are mismatch to each other", + schemaVersion, getDriverVersion())); + LOG.error("========================================================================="); + LOG.error("| Catalog Store Migration Is Needed |"); + LOG.error("========================================================================="); + LOG.error("| You might downgrade or upgrade Apache Tajo. Downgrading or upgrading |"); + LOG.error("| Tajo without migration process is only available in some versions. |"); + LOG.error("| In order to learn how to migration Apache Tajo instance, |"); + LOG.error("| please refer http://s.apache.org/0_8_migration. |"); + LOG.error("========================================================================="); + throw new CatalogException("Migration Needed. Please refer http://s.apache.org/0_8_migration."); + } LOG.info(String.format("The compatibility of the catalog schema (version: %d) has been verified.", getDriverVersion())); http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java index d6f9fc3..4a1053a 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java @@ -27,15 +27,9 @@ import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.exception.InternalException; import java.sql.*; -import java.util.HashMap; -import java.util.Map; public class DerbyStore extends AbstractDBStore { - /** 2014-03-20: First versioning */ - private static final int DERBY_STORE_VERSION_2 = 2; - /** Before 2013-03-20 */ - private static final int DERBY_STORE_VERSION_1 = 1; private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver"; protected String getCatalogDriverName(){ @@ -46,10 +40,6 @@ public class DerbyStore extends AbstractDBStore { super(conf); } - public int getDriverVersion() { - return DERBY_STORE_VERSION_2; - } - protected Connection createConnection(Configuration conf) throws SQLException { return DriverManager.getConnection(getCatalogUri()); } @@ -59,306 +49,6 @@ public class DerbyStore extends AbstractDBStore { return super.readSchemaFile("derby/" + filename); } - // TODO - DDL and index statements should be renamed - protected void createBaseTable() throws CatalogException { - Connection conn = null; - Statement stmt = null; - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - //META - if (!baseTableMaps.get(TB_META)) { - - String sql = super.readSchemaFile("common/meta.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - - LOG.info("Table '" + TB_META + "' is created."); - baseTableMaps.put(TB_META, true); - } - - // TABLE SPACES - if (!baseTableMaps.get(TB_SPACES)) { - - String sql = readSchemaFile("tablespaces.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - - LOG.info("Table '" + TB_SPACES + "' is created."); - baseTableMaps.put(TB_SPACES, true); - } - - // DATABASES - if (!baseTableMaps.get(TB_DATABASES)) { - String sql = readSchemaFile("databases.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = readSchemaFile("databases_idx.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_DATABASES + "' is created."); - baseTableMaps.put(TB_DATABASES, true); - } - - // TABLES - if (!baseTableMaps.get(TB_TABLES)) { - - String sql = readSchemaFile("tables.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - sql = "CREATE UNIQUE INDEX idx_tables_tid on TABLES (TID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - - sql = "CREATE UNIQUE INDEX idx_tables_name on TABLES (DB_ID, TABLE_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_TABLES + "' is created."); - baseTableMaps.put(TB_TABLES, true); - } - - // COLUMNS - if (!baseTableMaps.get(TB_COLUMNS)) { - String sql = readSchemaFile("columns.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX idx_fk_columns_table_name on " + TB_COLUMNS + "(TID, COLUMN_NAME)"; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - stmt.executeBatch(); - LOG.info("Table '" + TB_COLUMNS + " is created."); - baseTableMaps.put(TB_COLUMNS, true); - } - - // OPTIONS - if (!baseTableMaps.get(TB_OPTIONS)) { - String sql = readSchemaFile("table_properties.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - sql = "CREATE INDEX idx_options_key on " + TB_OPTIONS + "(TID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - - LOG.info("Table '" + TB_OPTIONS + " is created."); - baseTableMaps.put(TB_OPTIONS, true); - } - - // INDEXES - if (!baseTableMaps.get(TB_INDEXES)) { - String sql = readSchemaFile("indexes.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX idx_indexes_pk ON " + TB_INDEXES + "(" + COL_DATABASES_PK + ",index_name)"; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX idx_indexes_columns ON " + TB_INDEXES + "(" + COL_DATABASES_PK + ",column_name)"; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - stmt.executeBatch(); - LOG.info("Table '" + TB_INDEXES + "' is created."); - baseTableMaps.put(TB_INDEXES, true); - } - - if (!baseTableMaps.get(TB_STATISTICS)) { - String sql = readSchemaFile("stats.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - sql = "CREATE UNIQUE INDEX idx_stats_table_name ON " + TB_STATISTICS + "(TID)"; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - stmt.executeBatch(); - LOG.info("Table '" + TB_STATISTICS + "' is created."); - baseTableMaps.put(TB_STATISTICS, true); - } - - // PARTITION_METHODS - if (!baseTableMaps.get(TB_PARTITION_METHODS)) { - String sql = readSchemaFile("partition_methods.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - sql = "CREATE INDEX idx_partition_methods_table_id ON " + TB_PARTITION_METHODS + "(TID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - stmt.executeBatch(); - LOG.info("Table '" + TB_PARTITION_METHODS + "' is created."); - baseTableMaps.put(TB_PARTITION_METHODS, true); - } - - // PARTITIONS - if (!baseTableMaps.get(TB_PARTTIONS)) { - String sql = readSchemaFile("partitions.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - - sql = "CREATE INDEX idx_partitions_table_name ON PARTITIONS(TID)"; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - stmt.executeBatch(); - LOG.info("Table '" + TB_PARTTIONS + "' is created."); - baseTableMaps.put(TB_PARTTIONS, true); - } - - insertSchemaVersion(); - - } catch (SQLException se) { - throw new CatalogException("failed to create base tables for Derby catalog store.", se); - } finally { - CatalogUtil.closeQuietly(stmt); - } - } - - @Override - protected void dropBaseTable() throws CatalogException { - Connection conn; - Statement stmt = null; - Map<String, Boolean> droppedTable = new HashMap<String, Boolean>(); - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) { - String sql = "DROP TABLE " + entry.getKey(); - stmt.addBatch(sql); - droppedTable.put(entry.getKey(), true); - } - } - if(baseTableMaps.get(TB_TABLES)) { - String sql = "DROP TABLE " + TB_TABLES; - stmt.addBatch(sql); - droppedTable.put(TB_TABLES, true); - } - stmt.executeBatch(); - - for(String tableName : droppedTable.keySet()) { - LOG.info("Table '" + tableName + "' is dropped"); - } - } catch (SQLException se) { - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(stmt); - } - } - - @Override - protected boolean isInitialized() throws CatalogException { - Connection conn; - ResultSet res = null; - - try { - conn = getConnection(); - res = conn.getMetaData().getTables(null, null, null, - new String [] {"TABLE"}); - - baseTableMaps.put(TB_META, false); - baseTableMaps.put(TB_SPACES, false); - baseTableMaps.put(TB_DATABASES, false); - baseTableMaps.put(TB_TABLES, false); - baseTableMaps.put(TB_COLUMNS, false); - baseTableMaps.put(TB_OPTIONS, false); - baseTableMaps.put(TB_STATISTICS, false); - baseTableMaps.put(TB_INDEXES, false); - baseTableMaps.put(TB_PARTITION_METHODS, false); - baseTableMaps.put(TB_PARTTIONS, false); - - while (res.next()) { - baseTableMaps.put(res.getString("TABLE_NAME"), true); - } - } catch (SQLException se){ - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(res); - } - - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if (!entry.getValue()) { - return false; - } - } - - return true; - } - - @Override public final void close() { Connection conn = null; @@ -379,4 +69,24 @@ public class DerbyStore extends AbstractDBStore { } LOG.info("Shutdown database (" + catalogUri + ")"); } + + @Override + protected void createDatabaseDependants() throws CatalogException { + String schemaName = catalogSchemaManager.getCatalogStore().getSchema().getSchemaName(); + Statement stmt = null; + + if (schemaName != null && !schemaName.isEmpty()) { + try { + stmt = getConnection().createStatement(); + stmt.executeUpdate("CREATE SCHEMA " + schemaName); + } catch (SQLException e) { + throw new CatalogException(e); + } + } + } + + @Override + protected String getCatalogSchemaPath() { + return "schemas/derby"; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java index cea777e..45c153c 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/OracleStore.java @@ -15,25 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.tajo.catalog.store; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.exception.InternalException; public class OracleStore extends AbstractDBStore { - private static final int ORACLE_STORE_VERSION = 2; private static final String CATALOG_DRIVER = "oracle.jdbc.OracleDriver"; public OracleStore(Configuration conf) throws InternalException { @@ -46,335 +40,18 @@ public class OracleStore extends AbstractDBStore { } @Override - protected Connection createConnection(Configuration conf) throws SQLException { - return DriverManager.getConnection(getCatalogUri(), this.connectionId, this.connectionPassword); - } - - @Override - protected boolean isInitialized() throws CatalogException { - Connection conn; - ResultSet res = null; - - try { - conn = getConnection(); - res = conn.getMetaData().getTables(null, - this.connectionId != null?this.connectionId.toUpperCase():null, - null, new String[] { "TABLE" }); - - baseTableMaps.put(TB_META, false); - baseTableMaps.put(TB_SPACES, false); - baseTableMaps.put(TB_DATABASES, false); - baseTableMaps.put(TB_TABLES, false); - baseTableMaps.put(TB_COLUMNS, false); - baseTableMaps.put(TB_OPTIONS, false); - baseTableMaps.put(TB_STATISTICS, false); - baseTableMaps.put(TB_INDEXES, false); - baseTableMaps.put(TB_PARTITION_METHODS, false); - baseTableMaps.put(TB_PARTTIONS, false); - - while (res.next()) { - baseTableMaps.put(res.getString("TABLE_NAME"), true); - } - } catch (SQLException se) { - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(res); - } - - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if (!entry.getValue()) { - return false; - } - } - - return true; - } - - @Override - protected void createBaseTable() throws CatalogException { - Statement stmt = null; - Connection conn = null; - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - - // META - if (!baseTableMaps.get(TB_META)) { - String sql = super.readSchemaFile("common/meta.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_META + " is created."); - baseTableMaps.put(TB_META, true); - } - - // TABLE SPACES - if (!baseTableMaps.get(TB_SPACES)) { - String sql = readSchemaFile("tablespaces.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE SEQUENCE TABLESPACES_SEQ"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE OR REPLACE TRIGGER TABLESPACES_AUTOINC " + - "BEFORE INSERT ON TABLESPACES " + - "FOR EACH ROW " + - "WHEN (new.SPACE_ID IS NULL) " + - "BEGIN " + - " SELECT TABLESPACES_SEQ.NEXTVAL INTO :new.SPACE_ID FROM DUAL; " + - "END;"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_SPACES + "' is created."); - baseTableMaps.put(TB_SPACES, true); - } - - // DATABASES - if (!baseTableMaps.get(TB_DATABASES)) { - String sql = readSchemaFile("databases.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE SEQUENCE DATABASES__SEQ"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE OR REPLACE TRIGGER DATABASES__AUTOINC " + - "BEFORE INSERT ON DATABASES_ " + - "FOR EACH ROW " + - "WHEN (new.DB_ID IS NULL) " + - "BEGIN " + - " SELECT DATABASES__SEQ.NEXTVAL INTO :new.DB_ID FROM DUAL; " + - "END;"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_DATABASES + "' is created."); - baseTableMaps.put(TB_DATABASES, true); - } - - // TABLES - if (!baseTableMaps.get(TB_TABLES)) { - String sql = readSchemaFile("tables.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE SEQUENCE TABLES_SEQ"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE OR REPLACE TRIGGER TABLES_AUTOINC " + - "BEFORE INSERT ON TABLES " + - "FOR EACH ROW " + - "WHEN (new.TID IS NULL) " + - "BEGIN " + - " SELECT TABLES_SEQ.NEXTVAL INTO :new.TID FROM DUAL; " + - "END;"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX TABLES_IDX_DB_ID on TABLES (DB_ID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX TABLES_IDX_TABLE_ID on TABLES (DB_ID, TABLE_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_TABLES + "' is created."); - baseTableMaps.put(TB_TABLES, true); - } - - // COLUMNS - if (!baseTableMaps.get(TB_COLUMNS)) { - String sql = readSchemaFile("columns.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_COLUMNS + " is created."); - baseTableMaps.put(TB_COLUMNS, true); - } - - // OPTIONS - if (!baseTableMaps.get(TB_OPTIONS)) { - String sql = readSchemaFile("table_properties.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_OPTIONS + " is created."); - baseTableMaps.put(TB_OPTIONS, true); - } - - // INDEXES - if (!baseTableMaps.get(TB_INDEXES)) { - String sql = readSchemaFile("indexes.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, COLUMN_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_INDEXES + "' is created."); - baseTableMaps.put(TB_INDEXES, true); - } - - if (!baseTableMaps.get(TB_STATISTICS)) { - String sql = readSchemaFile("stats.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_STATISTICS + "' is created."); - baseTableMaps.put(TB_STATISTICS, true); - } - - // PARTITION_METHODS - if (!baseTableMaps.get(TB_PARTITION_METHODS)) { - String sql = readSchemaFile("partition_methods.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_PARTITION_METHODS + "' is created."); - baseTableMaps.put(TB_PARTITION_METHODS, true); - } - - // PARTITIONS - if (!baseTableMaps.get(TB_PARTTIONS)) { - String sql = readSchemaFile("partitions.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX PARTITIONS_IDX_TID on PARTITIONS (TID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_PARTTIONS + "' is created."); - baseTableMaps.put(TB_PARTTIONS, true); - } - - insertSchemaVersion(); - - } catch (SQLException se) { - throw new CatalogException("failed to create base tables for Oracle catalog store", se); - } finally { - CatalogUtil.closeQuietly(stmt); - } + protected String getCatalogSchemaPath() { + return "schemas/oracle"; } @Override - protected void dropBaseTable() throws CatalogException { - Connection conn; - Statement stmt = null; - PreparedStatement dropSequence = null; - Map<String, Boolean> droppedTable = new HashMap<String, Boolean>(); - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) { - String sql = "DROP TABLE " + entry.getKey(); - stmt.addBatch(sql); - droppedTable.put(entry.getKey(), true); - } - } - if(baseTableMaps.get(TB_TABLES)) { - String sql = "DROP TABLE " + TB_TABLES; - stmt.addBatch(sql); - droppedTable.put(TB_TABLES, true); - } - stmt.executeBatch(); - - for(String tableName : droppedTable.keySet()) { - LOG.info("Table '" + tableName + "' is dropped"); - } - - // Drop sequences - dropSequence = conn.prepareStatement("SELECT COUNT(*) FROM USER_SEQUENCES WHERE SEQUENCE_NAME = ?"); - for(String tableName : droppedTable.keySet()) { - String possibleSeqName = tableName + "_SEQ"; - dropSequence.setString(1, possibleSeqName); - ResultSet rs = dropSequence.executeQuery(); - - if (rs.next() && rs.getInt(1) > 0) { - String sql = "DROP SEQUENCE " + possibleSeqName; - stmt.execute(sql); - LOG.info("Sequence '" + possibleSeqName + "' is dropped"); - } - } - } catch (SQLException se) { - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(stmt); - CatalogUtil.closeQuietly(dropSequence); - } + protected Connection createConnection(Configuration conf) throws SQLException { + return DriverManager.getConnection(getCatalogUri(), this.connectionId, this.connectionPassword); } @Override - public int getDriverVersion() { - return ORACLE_STORE_VERSION; + protected void createDatabaseDependants() throws CatalogException { + } - @Override - public String readSchemaFile(String path) throws CatalogException { - return super.readSchemaFile("oracle/" + path); - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java index a06cf19..41f2909 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/PostgreSQLStore.java @@ -15,26 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.tajo.catalog.store; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.exception.InternalException; public class PostgreSQLStore extends AbstractDBStore { - private static final int POSTGRESQL_STORE_VERSION = 2; private static final String CATALOG_DRIVER = "org.postgresql.Driver"; - + public PostgreSQLStore(Configuration conf) throws InternalException { super(conf); } @@ -43,290 +38,20 @@ public class PostgreSQLStore extends AbstractDBStore { protected String getCatalogDriverName() { return CATALOG_DRIVER; } + + @Override + protected String getCatalogSchemaPath() { + return "schemas/postgresql"; + } @Override protected Connection createConnection(Configuration conf) throws SQLException { return DriverManager.getConnection(getCatalogUri(), this.connectionId, this.connectionPassword); } - + @Override - protected boolean isInitialized() throws CatalogException { - Connection conn; - ResultSet res = null; - - try { - conn = getConnection(); - res = conn.getMetaData().getTables(null, null, null, new String[] { "TABLE" }); - - baseTableMaps.put(TB_META, false); - baseTableMaps.put(TB_SPACES, false); - baseTableMaps.put(TB_DATABASES, false); - baseTableMaps.put(TB_TABLES, false); - baseTableMaps.put(TB_COLUMNS, false); - baseTableMaps.put(TB_OPTIONS, false); - baseTableMaps.put(TB_STATISTICS, false); - baseTableMaps.put(TB_INDEXES, false); - baseTableMaps.put(TB_PARTITION_METHODS, false); - baseTableMaps.put(TB_PARTTIONS, false); - - while (res.next()) { - baseTableMaps.put(res.getString("TABLE_NAME").toUpperCase(), true); - } - } catch (SQLException se) { - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(res); - } + protected void createDatabaseDependants() throws CatalogException { - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if (!entry.getValue()) { - return false; - } - } - - return true; - } - - @Override - protected void createBaseTable() throws CatalogException { - Statement stmt = null; - Connection conn = null; - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - - // META - if (!baseTableMaps.get(TB_META)) { - String sql = super.readSchemaFile("common/meta.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql.toString()); - LOG.info("Table '" + TB_META + " is created."); - baseTableMaps.put(TB_META, true); - } - - // TABLE SPACES - if (!baseTableMaps.get(TB_SPACES)) { - String sql = readSchemaFile("tablespaces.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX TABLESPACES_IDX_NAME on TABLESPACES (SPACE_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_SPACES + "' is created."); - baseTableMaps.put(TB_SPACES, true); - } - - // DATABASES - if (!baseTableMaps.get(TB_DATABASES)) { - String sql = readSchemaFile("databases.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX DATABASES__IDX_NAME on DATABASES_ (DB_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_DATABASES + "' is created."); - baseTableMaps.put(TB_DATABASES, true); - } - - // TABLES - if (!baseTableMaps.get(TB_TABLES)) { - String sql = readSchemaFile("tables.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX TABLES_IDX_DB_ID on TABLES (DB_ID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX TABLES_IDX_TABLE_ID on TABLES (DB_ID, TABLE_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_TABLES + "' is created."); - baseTableMaps.put(TB_TABLES, true); - } - - // COLUMNS - if (!baseTableMaps.get(TB_COLUMNS)) { - String sql = readSchemaFile("columns.sql"); - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_COLUMNS + " is created."); - baseTableMaps.put(TB_COLUMNS, true); - } - - // OPTIONS - if (!baseTableMaps.get(TB_OPTIONS)) { - String sql = readSchemaFile("table_properties.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_OPTIONS + " is created."); - baseTableMaps.put(TB_OPTIONS, true); - } - - // INDEXES - if (!baseTableMaps.get(TB_INDEXES)) { - String sql = readSchemaFile("indexes.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql.toString()); - - sql = "CREATE UNIQUE INDEX INDEXES_IDX_DB_ID_NAME on INDEXES (DB_ID, INDEX_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX INDEXES_IDX_TID_COLUMN_NAME on INDEXES (TID, COLUMN_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_INDEXES + "' is created."); - baseTableMaps.put(TB_INDEXES, true); - } - - if (!baseTableMaps.get(TB_STATISTICS)) { - String sql = readSchemaFile("stats.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql.toString()); - } - - stmt.executeUpdate(sql.toString()); - LOG.info("Table '" + TB_STATISTICS + "' is created."); - baseTableMaps.put(TB_STATISTICS, true); - } - - // PARTITION_METHODS - if (!baseTableMaps.get(TB_PARTITION_METHODS)) { - String sql = readSchemaFile("partition_methods.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - - stmt.executeUpdate(sql); - LOG.info("Table '" + TB_PARTITION_METHODS + "' is created."); - baseTableMaps.put(TB_PARTITION_METHODS, true); - } - - // PARTITIONS - if (!baseTableMaps.get(TB_PARTTIONS)) { - String sql = readSchemaFile("partitions.sql"); - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE INDEX PARTITIONS_IDX_TID on PARTITIONS (TID)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - sql = "CREATE UNIQUE INDEX IDX_TID_NAME on PARTITIONS (TID, PARTITION_NAME)"; - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } - stmt.addBatch(sql); - - stmt.executeBatch(); - LOG.info("Table '" + TB_PARTTIONS + "' is created."); - baseTableMaps.put(TB_PARTTIONS, true); - } - - insertSchemaVersion(); - - } catch (SQLException se) { - throw new CatalogException("failed to create base tables for PostgreSQL catalog store", se); - } finally { - CatalogUtil.closeQuietly(stmt); - } - } - - @Override - protected void dropBaseTable() throws CatalogException { - Connection conn; - Statement stmt = null; - Map<String, Boolean> droppedTable = new HashMap<String, Boolean>(); - - try { - conn = getConnection(); - stmt = conn.createStatement(); - - for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) { - if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) { - String sql = "DROP TABLE " + entry.getKey(); - stmt.addBatch(sql); - droppedTable.put(entry.getKey(), true); - } - } - if(baseTableMaps.get(TB_TABLES)) { - String sql = "DROP TABLE " + TB_TABLES; - stmt.addBatch(sql); - droppedTable.put(TB_TABLES, true); - } - stmt.executeBatch(); - - for(String tableName : droppedTable.keySet()) { - LOG.info("Table '" + tableName + "' is dropped"); - } - } catch (SQLException se) { - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(stmt); - } - } - - @Override - public int getDriverVersion() { - return POSTGRESQL_STORE_VERSION; - } - - @Override - public String readSchemaFile(String path) throws CatalogException { - return super.readSchemaFile("postgresql/" + path); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java new file mode 100644 index 0000000..b09bfa8 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/XMLCatalogSchemaManager.java @@ -0,0 +1,698 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FilenameFilter; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.exception.CatalogException; +import org.apache.tajo.catalog.store.object.*; + +public class XMLCatalogSchemaManager { + protected final Log LOG = LogFactory.getLog(getClass()); + private String schemaPath; + private final Unmarshaller unmarshaller; + private StoreObject catalogStore; + + public XMLCatalogSchemaManager(String schemaPath) throws CatalogException { + this.schemaPath = schemaPath; + try { + JAXBContext context = JAXBContext.newInstance(StoreObject.class); + unmarshaller = context.createUnmarshaller(); + + loadFromXmlFiles(); + } catch (Exception e) { + throw new CatalogException(e.getMessage(), e); + } + } + + protected String getDropSQL(DatabaseObjectType type, String name) + throws CatalogException { + SQLObject foundDropQuery = null; + String sqlStatement = "DROP " + type.toString() + " " + name; + + for (SQLObject dropQuery: catalogStore.getDropStatements()) { + if (type == dropQuery.getType()) { + foundDropQuery = dropQuery; + break; + } + } + + if (foundDropQuery != null && foundDropQuery.getSql() != null && !foundDropQuery.getSql().isEmpty()) { + String dropStatement = foundDropQuery.getSql(); + StringBuffer sqlBuffer = new StringBuffer(dropStatement.length()+name.length()); + int identifier = dropStatement.indexOf('?'); + + sqlBuffer.append(dropStatement.substring(0, identifier)).append(name) + .append(dropStatement.substring(identifier+1)); + sqlStatement = sqlBuffer.toString(); + } + + return sqlStatement; + } + + public void dropBaseSchema(Connection conn) throws CatalogException { + if (!isLoaded()) { + throw new CatalogException("Schema files are not loaded yet."); + } + + List<DatabaseObject> failedObjects = new ArrayList<DatabaseObject>(); + Statement stmt = null; + + try { + stmt = conn.createStatement(); + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + + for (DatabaseObject object: catalogStore.getSchema().getObjects()) { + try { + if (DatabaseObjectType.TABLE == object.getType() || + DatabaseObjectType.SEQUENCE == object.getType() || + DatabaseObjectType.VIEW == object.getType()) { + stmt.executeUpdate(getDropSQL(object.getType(), object.getName())); + } + } catch (SQLException e) { + failedObjects.add(object); + } + } + + if (failedObjects.size() > 0) { + StringBuffer errorMessage = new StringBuffer(64); + errorMessage.append("Failed to drop database objects "); + + for (int idx = 0; idx < failedObjects.size(); idx++) { + DatabaseObject object = failedObjects.get(idx); + errorMessage.append(object.getType().toString()).append(" ") + .append(object.getName()); + if ((idx+1) < failedObjects.size()) { + errorMessage.append(','); + } + } + + LOG.warn(errorMessage.toString()); + } + } + + protected PreparedStatement getExistQuery(Connection conn, DatabaseObjectType type) + throws SQLException { + PreparedStatement pstmt = null; + + for (SQLObject existQuery: catalogStore.getExistQueries()) { + if (type.equals(existQuery.getType())) { + pstmt = conn.prepareStatement(existQuery.getSql()); + break; + } + } + + return pstmt; + } + + protected boolean checkExistance(Connection conn, DatabaseObjectType type, String... params) + throws SQLException { + boolean result = false; + DatabaseMetaData metadata = null; + PreparedStatement pstmt = null; + BaseSchema baseSchema = catalogStore.getSchema(); + + if (params == null || params.length < 1) { + throw new IllegalArgumentException("checkExistance function needs at least one argument."); + } + + switch(type) { + case DATA: + metadata = conn.getMetaData(); + ResultSet data = metadata.getUDTs(null, baseSchema.getSchemaName() != null && !baseSchema.getSchemaName().isEmpty()? + baseSchema.getSchemaName().toUpperCase():null, + params[0].toUpperCase(), null); + result = data.next(); + CatalogUtil.closeQuietly(data); + break; + case FUNCTION: + metadata = conn.getMetaData(); + ResultSet functions = metadata.getFunctions(null, baseSchema.getSchemaName() != null && !baseSchema.getSchemaName().isEmpty()? + baseSchema.getSchemaName().toUpperCase():null, + params[0].toUpperCase()); + result = functions.next(); + CatalogUtil.closeQuietly(functions); + break; + case INDEX: + if (params.length != 2) { + throw new IllegalArgumentException("Finding index object is needed two strings, table name and index name"); + } + + pstmt = getExistQuery(conn, type); + if (pstmt != null) { + result = checkExistanceByQuery(pstmt, baseSchema, params); + } else { + metadata = conn.getMetaData(); + ResultSet indexes = metadata.getIndexInfo(null, baseSchema.getSchemaName() != null + && !baseSchema.getSchemaName().isEmpty() ? baseSchema.getSchemaName().toUpperCase() : null, + params[0].toUpperCase(), false, true); + while (indexes.next()) { + if (indexes.getString("INDEX_NAME").equals(params[1].toUpperCase())) { + result = true; + break; + } + } + CatalogUtil.closeQuietly(indexes); + } + break; + case TABLE: + pstmt = getExistQuery(conn, type); + if (pstmt != null) { + result = checkExistanceByQuery(pstmt, baseSchema, params); + } else { + metadata = conn.getMetaData(); + ResultSet tables = metadata.getTables(null, baseSchema.getSchemaName() != null + && !baseSchema.getSchemaName().isEmpty() ? baseSchema.getSchemaName().toUpperCase() : null, + params[0].toUpperCase(), new String[] { "TABLE" }); + result = tables.next(); + CatalogUtil.closeQuietly(tables); + } + break; + case DOMAIN: + case OPERATOR: + case RULE: + case SEQUENCE: + case TRIGGER: + case VIEW: + pstmt = getExistQuery(conn, type); + + if (pstmt == null) { + throw new CatalogException("Finding " + type + + " type of database object is not supported on this database system."); + } + + result = checkExistanceByQuery(pstmt, baseSchema, params); + break; + } + + return result; + } + + private boolean checkExistanceByQuery(PreparedStatement pstmt, BaseSchema baseSchema, + String... params) throws SQLException { + int paramIdx = 1; + boolean result = false; + + if (baseSchema.getSchemaName() != null && !baseSchema.getSchemaName().isEmpty()) { + pstmt.setString(paramIdx++, baseSchema.getSchemaName().toUpperCase()); + } + + for (; paramIdx <= pstmt.getParameterMetaData().getParameterCount(); paramIdx++) { + pstmt.setString(paramIdx, params[paramIdx-1]); + } + + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + if (rs.getString(1).equals(params[params.length-1].toUpperCase())) { + result = true; + break; + } + } + CatalogUtil.closeQuietly(rs); + return result; + } + + public void createBaseSchema(Connection conn) throws CatalogException { + Statement stmt; + + if (!isLoaded()) { + throw new CatalogException("Database schema files are not loaded."); + } + + try { + stmt = conn.createStatement(); + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + + for (DatabaseObject object: catalogStore.getSchema().getObjects()) { + try { + String[] params; + if (DatabaseObjectType.INDEX == object.getType()) { + params = new String[2]; + params[0] = object.getDependsOn(); + params[1] = object.getName(); + } + else { + params = new String[1]; + params[0] = object.getName(); + } + + if (checkExistance(conn, object.getType(), params)) { + LOG.info("Skip to create " + object.getName() + " databse object. Already exists."); + } + else { + stmt.executeUpdate(object.getSql()); + LOG.info(object.getName() + " " + object.getType() + " is created."); + } + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + } + } + + public void upgradeBaseSchema(Connection conn, int currentVersion) { + if (!isLoaded()) { + throw new CatalogException("Database schema files are not loaded."); + } + + final List<SchemaPatch> candidatePatches = new ArrayList<SchemaPatch>(); + Statement stmt; + + for (SchemaPatch patch: this.catalogStore.getPatches()) { + if (currentVersion >= patch.getPriorVersion()) { + candidatePatches.add(patch); + } + } + + Collections.sort(candidatePatches); + try { + stmt = conn.createStatement(); + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + + for (SchemaPatch patch: candidatePatches) { + for (DatabaseObject object: patch.getObjects()) { + try { + stmt.executeUpdate(object.getSql()); + LOG.info(object.getName() + " " + object.getType() + " was created or altered."); + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + } + } + } + + public boolean isInitialized(Connection conn) throws CatalogException { + if (!isLoaded()) { + throw new CatalogException("Database schema files are not loaded."); + } + + boolean result = true; + + for (DatabaseObject object: catalogStore.getSchema().getObjects()) { + try { + if (DatabaseObjectType.INDEX == object.getType()) { + result &= checkExistance(conn, object.getType(), object.getDependsOn(), object.getName()); + } + else { + result &= checkExistance(conn, object.getType(), object.getName()); + } + } catch (SQLException e) { + throw new CatalogException(e.getMessage(), e); + } + + if (!result) { + break; + } + } + return result; + } + + protected String[] listFileResources(URL dirURL, String schemaPath, FilenameFilter filter) + throws URISyntaxException, IOException { + String[] files = new String[0]; + String[] tempFiles; + List<String> filesList = new ArrayList<String>(); + File dirFile = new File(dirURL.toURI()); + + tempFiles = dirFile.list(filter); + files = new String[tempFiles.length]; + + for (String fileName: tempFiles) { + filesList.add(schemaPath+"/"+fileName); + } + + files = filesList.toArray(files); + return files; + } + + protected String[] listJarResources(URL dirURL, FilenameFilter filter) + throws IOException, URISyntaxException { + String[] files = new String[0]; + String spec = dirURL.getFile(); + int seperator = spec.indexOf("!/"); + + if (seperator == -1) { + return files; + } + + URL jarFileURL = new URL(spec.substring(0, seperator)); + JarFile jarFile = new JarFile(jarFileURL.toURI().getPath()); + Set<String> filesSet = new HashSet<String>(); + + try { + Enumeration<JarEntry> entries = jarFile.entries(); + + while (entries.hasMoreElements()) { + JarEntry entry = entries.nextElement(); + + if (entry.isDirectory()) { + continue; + } + + String entryName = entry.getName(); + + if (entryName.indexOf(schemaPath) > -1 && + filter.accept(null, entryName)) { + filesSet.add(entryName); + } + } + } finally { + jarFile.close(); + } + + if (!filesSet.isEmpty()) { + files = new String[filesSet.size()]; + files = filesSet.toArray(files); + } + + return files; + } + + protected String[] listResources() throws IOException, URISyntaxException { + String[] files = new String[0]; + URL dirURL = ClassLoader.getSystemResource(schemaPath); + FilenameFilter fileFilter = new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + return ((name.lastIndexOf('.') > -1) && + (".xml".equalsIgnoreCase(name.substring(name.lastIndexOf('.'))))); + } + }; + + if (dirURL == null) { + throw new FileNotFoundException(schemaPath); + } + + if (dirURL.getProtocol().equals("file")) { + files = listFileResources(dirURL, schemaPath, fileFilter); + } + + if (dirURL.getProtocol().equals("jar")) { + files = listJarResources(dirURL, fileFilter); + } + + return files; + } + + protected void mergeXmlSchemas(final List<StoreObject> storeObjects) throws CatalogException { + if (storeObjects.size() <= 0) { + throw new CatalogException("Unable to find a schema file."); + } + + this.catalogStore = new StoreObjectsMerger(storeObjects).merge(); + } + + protected void loadFromXmlFiles() throws IOException, XMLStreamException, URISyntaxException { + XMLInputFactory xmlIf = XMLInputFactory.newInstance(); + final List<StoreObject> storeObjects = new ArrayList<StoreObject>(); + + xmlIf.setProperty(XMLInputFactory.IS_COALESCING, Boolean.TRUE); + + for (String resname: listResources()) { + URL filePath = ClassLoader.getSystemResource(resname); + + if (filePath == null) { + throw new FileNotFoundException(resname); + } + + loadFromXmlFile(xmlIf, filePath, storeObjects); + } + + mergeXmlSchemas(storeObjects); + } + + protected void loadFromXmlFile(XMLInputFactory xmlIf, URL filePath, List<StoreObject> storeObjects) + throws IOException, XMLStreamException { + XMLStreamReader xmlReader; + + xmlReader = xmlIf.createXMLStreamReader(filePath.openStream()); + + try { + while (xmlReader.hasNext()) { + if (xmlReader.next() == XMLStreamConstants.START_ELEMENT && + "store".equals(xmlReader.getLocalName())) { + StoreObject catalogStore = loadCatalogStore(xmlReader); + if (catalogStore != null) { + storeObjects.add(catalogStore); + } + } + } + } finally { + try { + xmlReader.close(); + } catch (XMLStreamException ignored) { } + } + } + + protected StoreObject loadCatalogStore(XMLStreamReader xmlReader) throws XMLStreamException { + try { + JAXBElement<StoreObject> elem = unmarshaller.unmarshal(xmlReader, StoreObject.class); + return elem.getValue(); + } catch (JAXBException e) { + throw new XMLStreamException(e.getMessage(), xmlReader.getLocation(), e); + } + } + + protected StoreObject getCatalogStore() { + return catalogStore; + } + + public boolean isLoaded() { + return catalogStore!=null; + } + + private class StoreObjectsMerger { + + private final List<StoreObject> storeObjects = new ArrayList<StoreObject>(); + private final StoreObject targetStore = new StoreObject(); + + public StoreObjectsMerger(List<StoreObject> schemaObjects) { + this.storeObjects.addAll(schemaObjects); + } + + protected void copySchemaInfo(StoreObject sourceStore) { + if (sourceStore.getSchema().getSchemaName() != null && + !sourceStore.getSchema().getSchemaName().isEmpty()) { + if (targetStore.getSchema().getSchemaName() != null && + !targetStore.getSchema().getSchemaName().isEmpty() && + !targetStore.getSchema().getSchemaName().equalsIgnoreCase( + sourceStore.getSchema().getSchemaName())) { + throw new CatalogException("different schema names are specified. One is " + + sourceStore.getSchema().getSchemaName() + " and other is " + + targetStore.getSchema().getSchemaName()); + } + + if (targetStore.getSchema().getSchemaName() == null || + targetStore.getSchema().getSchemaName().isEmpty()) { + targetStore.getSchema().setSchemaName(sourceStore.getSchema().getSchemaName()); + } + } + + if (sourceStore.getSchema().getVersion() > -1 && + targetStore.getSchema().getVersion() < sourceStore.getSchema().getVersion()) { + targetStore.getSchema().setVersion(sourceStore.getSchema().getVersion()); + } + } + + private List<DatabaseObject> createListAndFillNull(int maxIdx) { + DatabaseObject[] objects = new DatabaseObject[maxIdx]; + Arrays.fill(objects, null); + return new ArrayList<DatabaseObject>(Arrays.asList(objects)); + } + + protected List<DatabaseObject> mergeDatabaseObjects(List<DatabaseObject> objects) { + int maxIdx = -1; + + for (DatabaseObject object: objects) { + maxIdx = Math.max(maxIdx, object.getOrder()); + } + + final List<DatabaseObject> orderedObjects = createListAndFillNull(maxIdx); + final List<DatabaseObject> unorderedObjects = new ArrayList<DatabaseObject>(); + final List<DatabaseObject> mergedObjects = new ArrayList<DatabaseObject>(); + + for (DatabaseObject object: objects) { + if (object.getOrder() > -1) { + int objIdx = object.getOrder(); + + if (objIdx < orderedObjects.size() && orderedObjects.get(objIdx) != null) { + throw new CatalogException("This catalog configuration contains duplicated order of DatabaseObject"); + } + + orderedObjects.add(objIdx, object); + } + else { + unorderedObjects.add(object); + } + } + + for (DatabaseObject object: orderedObjects) { + if (object != null) { + mergedObjects.add(object); + } + } + + for (DatabaseObject object: unorderedObjects) { + if (object != null) { + mergedObjects.add(object); + } + } + + return mergedObjects; + } + + protected void validatePatch(List<SchemaPatch> patches, SchemaPatch testPatch) { + if (testPatch.getPriorVersion() > testPatch.getNextVersion()) { + throw new CatalogException("Prior version cannot proceed to next version of patch."); + } + + for (SchemaPatch patch: patches) { + if (testPatch.equals(patch)) { + continue; + } + + if (testPatch.getPriorVersion() == patch.getPriorVersion()) { + LOG.warn("It has the same prior version (" + testPatch.getPriorVersion() + ") of patch."); + + if (testPatch.getNextVersion() == patch.getNextVersion()) { + throw new CatalogException("Duplicate versions of patch found. It will terminate Catalog Store. "); + } + } + + if (testPatch.getNextVersion() == patch.getNextVersion()) { + LOG.warn("It has the same next version (" + testPatch.getPriorVersion() + ") of patch."); + } + } + } + + protected void mergePatches(List<SchemaPatch> patches) { + final List<DatabaseObject> objects = new ArrayList<DatabaseObject>(); + + Collections.sort(patches); + + for (SchemaPatch patch: patches) { + validatePatch(patches, patch); + + objects.clear(); + List<DatabaseObject> tempObjects = new ArrayList<DatabaseObject>(); + tempObjects.addAll(patch.getObjects()); + patch.clearObjects(); + patch.addObjects(mergeDatabaseObjects(tempObjects)); + + targetStore.addPatch(patch); + } + } + + protected void validateSQLObject(List<SQLObject> queries, SQLObject testQuery) { + int occurredCount = 0; + + for (SQLObject query: queries) { + if (query.getType() == testQuery.getType()) { + occurredCount++; + } + } + + if (occurredCount > 1) { + throw new CatalogException("Duplicate Query type (" + testQuery.getType() + ") has found."); + } + } + + protected void mergeExistQueries(List<SQLObject> queries) { + for (SQLObject query: queries) { + validateSQLObject(queries, query); + + targetStore.addExistQuery(query); + } + } + + protected void mergeDropStatements(List<SQLObject> queries) { + for (SQLObject query: queries) { + validateSQLObject(queries, query); + + targetStore.addDropStatement(query); + } + } + + public StoreObject merge() { + boolean alreadySetDatabaseObject = false; + + // first pass + for (StoreObject store : this.storeObjects) { + copySchemaInfo(store); + } + + // second pass + for (StoreObject store: this.storeObjects) { + if (store.getSchema().getVersion() == targetStore.getSchema().getVersion() && + !alreadySetDatabaseObject) { + BaseSchema targetSchema = targetStore.getSchema(); + targetSchema.clearObjects(); + targetSchema.addObjects(mergeDatabaseObjects(store.getSchema().getObjects())); + + alreadySetDatabaseObject = true; + } + + mergePatches(store.getPatches()); + mergeExistQueries(store.getExistQueries()); + mergeDropStatements(store.getDropStatements()); + } + + return this.targetStore; + } + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java new file mode 100644 index 0000000..3c9a3e2 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/BaseSchema.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; + +@XmlAccessorType(XmlAccessType.FIELD) +public class BaseSchema implements Comparable<BaseSchema> { + + @XmlAttribute(name="version",required=true) + private int version; + @XmlAttribute(name="schemaname") + private String schemaName; + @XmlElementWrapper(name="objects",namespace="http://tajo.apache.org/catalogstore") + @XmlElement(name="Object",namespace="http://tajo.apache.org/catalogstore") + private final List<DatabaseObject> objects = new ArrayList<DatabaseObject>(); + + public int getVersion() { + return version; + } + public void setVersion(int version) { + this.version = version; + } + public String getSchemaName() { + return schemaName; + } + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + public List<DatabaseObject> getObjects() { + return objects; + } + public void addObject(DatabaseObject object) { + this.objects.add(object); + } + public void addObjects(Collection<DatabaseObject> objects) { + this.objects.addAll(objects); + } + public void clearObjects() { + this.objects.clear(); + } + + @Override + public int compareTo(BaseSchema o) { + return (int) Math.signum(getVersion()-o.getVersion()); + } + @Override + public String toString() { + return "BaseSchema [version=" + version + ", schemaName=" + schemaName + ", objects=" + objects + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java new file mode 100644 index 0000000..4132018 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObject.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; + +@XmlAccessorType(XmlAccessType.FIELD) +public class DatabaseObject implements Comparable<DatabaseObject> { + + @XmlAttribute(name="order") + private int order=-1; + @XmlAttribute(name="type",required=true) + private DatabaseObjectType type; + @XmlAttribute(name="name",required=true) + private String name; + @XmlAttribute(name="dependsOn") + private String dependsOn; + @XmlElement(name="sql",namespace="http://tajo.apache.org/catalogstore") + private String sql; + + public int getOrder() { + return order; + } + public void setOrder(int order) { + this.order = order; + } + public DatabaseObjectType getType() { + return type; + } + public void setType(DatabaseObjectType type) { + this.type = type; + } + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + public String getDependsOn() { + return dependsOn; + } + public void setDependsOn(String dependsOn) { + this.dependsOn = dependsOn; + } + public String getSql() { + return sql; + } + public void setSql(String sqlStatement) { + this.sql = sqlStatement; + } + @Override + public String toString() { + return "DatabaseObject [order=" + order + ", type=" + type + ", name=" + name + ", dependsOn=" + dependsOn + + ", sql=" + sql + "]"; + } + @Override + public int compareTo(DatabaseObject o) { + return (int) Math.signum(getOrder() - o.getOrder()); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java new file mode 100644 index 0000000..b41dfb9 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/DatabaseObjectType.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import javax.xml.bind.annotation.XmlEnum; +import javax.xml.bind.annotation.XmlEnumValue; +import javax.xml.bind.annotation.XmlType; + +@XmlType(name="DatabaseObjectsType") +@XmlEnum(String.class) +public enum DatabaseObjectType { + @XmlEnumValue("table") + TABLE, + @XmlEnumValue("view") + VIEW, + @XmlEnumValue("function") + FUNCTION, + @XmlEnumValue("operator") + OPERATOR, + @XmlEnumValue("data") + DATA, + @XmlEnumValue("domain") + DOMAIN, + @XmlEnumValue("trigger") + TRIGGER, + @XmlEnumValue("rule") + RULE, + @XmlEnumValue("sequence") + SEQUENCE, + @XmlEnumValue("index") + INDEX; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java new file mode 100644 index 0000000..e22dc73 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SQLObject.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; + +@XmlAccessorType(XmlAccessType.FIELD) +public class SQLObject { + + @XmlAttribute + private DatabaseObjectType type; + @XmlElement(name="sql",namespace="http://tajo.apache.org/catalogstore") + private String sql; + + public DatabaseObjectType getType() { + return type; + } + public void setType(DatabaseObjectType type) { + this.type = type; + } + public String getSql() { + return sql; + } + public void setSql(String sql) { + this.sql = sql; + } + + @Override + public String toString() { + return "ExistQuery [type=" + type + ", sql=" + sql + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java new file mode 100644 index 0000000..3859d03 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/SchemaPatch.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import java.util.ArrayList; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; + +@XmlAccessorType(XmlAccessType.FIELD) +public class SchemaPatch implements Comparable<SchemaPatch> { + + @XmlAttribute + private int priorVersion; + @XmlAttribute + private int nextVersion; + @XmlElementWrapper(name="objects",namespace="http://tajo.apache.org/catalogstore") + @XmlElement(name="Object",namespace="http://tajo.apache.org/catalogstore") + private final List<DatabaseObject> objects = new ArrayList<DatabaseObject>(); + + public int getPriorVersion() { + return priorVersion; + } + public void setPriorVersion(int priorVersion) { + this.priorVersion = priorVersion; + } + public int getNextVersion() { + return nextVersion; + } + public void setNextVersion(int nextVersion) { + this.nextVersion = nextVersion; + } + public List<DatabaseObject> getObjects() { + return objects; + } + public void addObject(DatabaseObject object) { + this.objects.add(object); + } + public void addObjects(List<DatabaseObject> objects) { + this.objects.addAll(objects); + } + public void clearObjects() { + this.objects.clear(); + } + @Override + public int compareTo(SchemaPatch o) { + int result = (int) Math.signum(getPriorVersion()-o.getPriorVersion()); + if (result == 0) { + result = (int) Math.signum(getNextVersion()-o.getNextVersion()); + } + return result; + } + @Override + public String toString() { + return "SchemaPatch [priorVersion=" + priorVersion + ", nextVersion=" + nextVersion + ", objects=" + objects + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/00555685/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java new file mode 100644 index 0000000..c83ee3b --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/object/StoreObject.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store.object; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; + +@XmlAccessorType(XmlAccessType.FIELD) +public class StoreObject { + + @XmlElement(name="base",namespace="http://tajo.apache.org/catalogstore") + private BaseSchema schema = new BaseSchema(); + @XmlElementWrapper(name="existQueries",namespace="http://tajo.apache.org/catalogstore") + @XmlElement(name="existQuery",namespace="http://tajo.apache.org/catalogstore") + private final List<SQLObject> existQueries = new ArrayList<SQLObject>(); + @XmlElementWrapper(name="dropStatements",namespace="http://tajo.apache.org/catalogstore") + @XmlElement(name="dropStatement",namespace="http://tajo.apache.org/catalogstore") + private final List<SQLObject> dropStatements = new ArrayList<SQLObject>(); + @XmlElementWrapper(name="patches",namespace="http://tajo.apache.org/catalogstore") + @XmlElement(name="patch",namespace="http://tajo.apache.org/catalogstore") + private final List<SchemaPatch> patches = new ArrayList<SchemaPatch>(); + + public BaseSchema getSchema() { + return schema; + } + public void setSchema(BaseSchema schema) { + this.schema = schema; + } + public List<SQLObject> getExistQueries() { + return existQueries; + } + public void addExistQuery(SQLObject query) { + this.existQueries.add(query); + } + public void addExistQueries(Collection<SQLObject> queries) { + this.existQueries.addAll(queries); + } + public void clearExistQueries() { + this.existQueries.clear(); + } + public List<SQLObject> getDropStatements() { + return dropStatements; + } + public void addDropStatement(SQLObject query) { + this.dropStatements.add(query); + } + public void addDropStatements(Collection<SQLObject> queries) { + this.dropStatements.addAll(queries); + } + public void clearDropStatements() { + this.dropStatements.clear(); + } + public List<SchemaPatch> getPatches() { + return patches; + } + public void addPatch(SchemaPatch patch) { + this.patches.add(patch); + } + public void addPatches(Collection<SchemaPatch> patches) { + this.patches.addAll(patches); + } + public void clearPatches() { + this.patches.clear(); + } +}
