Repository: oozie
Updated Branches:
  refs/heads/master b074b2cee -> 9839fb292


OOZIE-2245 Service to periodically check database schema (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9839fb29
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9839fb29
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9839fb29

Branch: refs/heads/master
Commit: 9839fb292e5483f4d101849483109208de75f9c3
Parents: b074b2c
Author: Robert Kanter <[email protected]>
Authored: Thu Aug 27 10:15:34 2015 -0700
Committer: Robert Kanter <[email protected]>
Committed: Thu Aug 27 10:15:34 2015 -0700

----------------------------------------------------------------------
 .../oozie/command/SchemaCheckXCommand.java      | 529 +++++++++++++++++++
 .../oozie/service/SchemaCheckerService.java     | 125 +++++
 core/src/main/resources/oozie-default.xml       |  21 +-
 docs/src/site/twiki/AG_Monitoring.twiki         |  26 +
 release-log.txt                                 |   1 +
 5 files changed, 701 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java 
b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java
new file mode 100644
index 0000000..1cc086e
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.oozie.BinaryBlob;
+import org.apache.oozie.BundleActionBean;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.SLAEventBean;
+import org.apache.oozie.StringBlob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonSLAEvent;
+import org.apache.oozie.service.SchemaCheckerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.sla.SLARegistrationBean;
+import org.apache.oozie.sla.SLASummaryBean;
+import org.apache.oozie.util.Pair;
+import org.apache.oozie.util.XLog;
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Column;
+import javax.persistence.DiscriminatorColumn;
+import javax.persistence.DiscriminatorType;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.Table;
+import java.lang.reflect.Field;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class SchemaCheckXCommand extends XCommand<Void> {
+    private XLog LOG = XLog.getLog(SchemaCheckXCommand.class);
+
+    private String dbType;
+    private String url;
+    private String user;
+    private String pass;
+    private boolean ignoreExtras;
+
+    public SchemaCheckXCommand(String dbType, String url, String user, String 
pass, boolean ignoreExtras) {
+        super("schema-check", "schema-check", 0);
+        this.dbType = dbType;
+        this.url = url;
+        this.user = user;
+        this.pass = pass;
+        this.ignoreExtras = ignoreExtras;
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        Connection conn = null;
+        LOG.info("About to check database schema");
+        Date startTime = new Date();
+        boolean problem = false;
+        try {
+            conn = DriverManager.getConnection(url, user, pass);
+            String catalog = conn.getCatalog();
+            DatabaseMetaData metaData = conn.getMetaData();
+
+            Map<String, Class<? extends JsonBean>> tableClasses = new 
HashMap<String, Class<? extends JsonBean>>();
+            tableClasses.put(getTableName(BundleActionBean.class), 
BundleActionBean.class);
+            tableClasses.put(getTableName(BundleJobBean.class), 
BundleJobBean.class);
+            tableClasses.put(getTableName(CoordinatorActionBean.class), 
CoordinatorActionBean.class);
+            tableClasses.put(getTableName(CoordinatorJobBean.class), 
CoordinatorJobBean.class);
+            tableClasses.put(getTableName(JsonSLAEvent.class), 
JsonSLAEvent.class);
+            tableClasses.put(getTableName(SLARegistrationBean.class), 
SLARegistrationBean.class);
+            tableClasses.put(getTableName(SLASummaryBean.class), 
SLASummaryBean.class);
+            tableClasses.put(getTableName(WorkflowActionBean.class), 
WorkflowActionBean.class);
+            tableClasses.put(getTableName(WorkflowJobBean.class), 
WorkflowJobBean.class);
+
+            boolean tableProblem = checkTables(metaData, catalog, 
tableClasses.keySet());
+            problem = problem | tableProblem;
+            if (!tableProblem) {
+                for (Map.Entry<String, Class<? extends JsonBean>> table : 
tableClasses.entrySet()) {
+                        TableInfo ti = new TableInfo(table.getValue(), dbType);
+                        boolean columnProblem = checkColumns(metaData, 
catalog, table.getKey(), ti.columnTypes);
+                        problem = problem | columnProblem;
+                        if (!columnProblem) {
+                            boolean primaryKeyProblem = 
checkPrimaryKey(metaData, catalog, table.getKey(), ti.primaryKeyColumn);
+                            problem = problem | primaryKeyProblem;
+                            boolean indexProblem = checkIndexes(metaData, 
catalog, table.getKey(), ti.indexedColumns);
+                            problem = problem | indexProblem;
+                        }
+                    }
+            }
+            if (problem) {
+                LOG.error("Database schema is BAD! Check previous error log 
messages for details");
+            } else {
+                LOG.info("Database schema is GOOD");
+            }
+        } catch (SQLException sqle) {
+            LOG.error("An Exception occured while talking to the database: " + 
sqle.getMessage(), sqle);
+            problem = true;
+        } finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (Exception e) {
+                    LOG.error("An Exception occured while disconnecting from 
the database: " + e.getMessage(), e);
+                }
+            }
+            
Services.get().get(SchemaCheckerService.class).updateInstrumentation(problem, 
startTime);
+        }
+        return null;
+    }
+
+    private boolean checkTables(DatabaseMetaData metaData, String catalog, 
final Collection<String> expectedTablesRaw)
+            throws SQLException {
+        boolean problem = false;
+        Set<String> expectedTables = new HashSet<String>(expectedTablesRaw);
+        expectedTables.add(caseTableName("oozie_sys"));
+        expectedTables.add(caseTableName("openjpa_sequence_table"));
+        expectedTables.add(caseTableName("validate_conn"));
+        // Oracle returns > 1000 tables if we don't have the schema "OOZIE"; 
MySQL and Postgres don't want this
+        String schema = null;
+        if (dbType.equals("oracle")) {
+            schema = "OOZIE";
+        }
+        ResultSet rs = metaData.getTables(catalog, schema, null, new 
String[]{"TABLE"});
+        Set<String> foundTables = new HashSet<String>();
+        while (rs.next()) {
+            String tabName = rs.getString("TABLE_NAME");
+            if (tabName != null) {
+                foundTables.add(tabName);
+            }
+        }
+        Collection missingTables = CollectionUtils.subtract(expectedTables, 
foundTables);
+        if (!missingTables.isEmpty()) {
+            LOG.error("Found [{0}] missing tables: {1}", missingTables.size(), 
Arrays.toString(missingTables.toArray()));
+            problem = true;
+        } else if (LOG.isDebugEnabled()) {
+            LOG.debug("No missing tables found: {0}", 
Arrays.toString(expectedTables.toArray()));
+        }
+        if (!ignoreExtras) {
+            Collection extraTables = CollectionUtils.subtract(foundTables, 
expectedTables);
+            if (!extraTables.isEmpty()) {
+                LOG.error("Found [{0}] extra tables: {1}", extraTables.size(), 
Arrays.toString(extraTables.toArray()));
+                problem = true;
+            } else {
+                LOG.debug("No extra tables found");
+            }
+        }
+        return problem;
+    }
+
+    private boolean checkColumns(DatabaseMetaData metaData, String catalog, 
String table,
+                                 Map<String, Integer> expectedColumnTypes) 
throws SQLException {
+        boolean problem = false;
+        Map<String, Pair<Integer, String>> foundColumns = new HashMap<String, 
Pair<Integer, String>>();
+        ResultSet rs = metaData.getColumns(catalog, null, table, null);
+        while (rs.next()) {
+            String colName = rs.getString("COLUMN_NAME");
+            Integer dataType = rs.getInt("DATA_TYPE");
+            String colDef = rs.getString("COLUMN_DEF");
+            if (colName != null) {
+                foundColumns.put(colName, new Pair<Integer, String>(dataType, 
colDef));
+            }
+        }
+        Collection missingColumns = 
CollectionUtils.subtract(expectedColumnTypes.keySet(), foundColumns.keySet());
+        if (!missingColumns.isEmpty()) {
+            LOG.error("Found [{0}] missing columns in table [{1}]: {2}",
+                    missingColumns.size(), table, 
Arrays.toString(missingColumns.toArray()));
+            problem = true;
+        } else {
+            for (Map.Entry<String, Integer> ent : 
expectedColumnTypes.entrySet()) {
+                if 
(!foundColumns.get(ent.getKey()).getFist().equals(ent.getValue())) {
+                    LOG.error("Expected column [{0}] in table [{1}] to have 
type [{2}], but found type [{3}]",
+                            ent.getKey(), table, 
getSQLTypeFromInt(ent.getValue()),
+                            
getSQLTypeFromInt(foundColumns.get(ent.getKey()).getFist()));
+                    problem = true;
+                } else if (foundColumns.get(ent.getKey()).getSecond() != null) 
{
+                    LOG.error("Expected column [{0}] in table [{1}] to have 
default value [NULL], but found default vale [{2}]",
+                            ent.getKey(), table, 
foundColumns.get(ent.getKey()).getSecond());
+                    problem = true;
+                } else {
+                    LOG.debug("Found column [{0}] in table [{1}] with type 
[{2}] and default value [NULL]",
+                            ent.getKey(), table, 
getSQLTypeFromInt(ent.getValue()));
+                }
+            }
+        }
+        if (!ignoreExtras) {
+            Collection extraColumns = 
CollectionUtils.subtract(foundColumns.keySet(), expectedColumnTypes.keySet());
+            if (!extraColumns.isEmpty()) {
+                LOG.error("Found [{0}] extra columns in table [{1}]: {2}",
+                        extraColumns.size(), table, 
Arrays.toString(extraColumns.toArray()));
+                problem = true;
+            } else {
+                LOG.debug("No extra columns found in table [{0}]", table);
+            }
+        }
+        return problem;
+    }
+
+    private boolean checkPrimaryKey(DatabaseMetaData metaData, String catalog, 
String table, String expectedPrimaryKeyColumn)
+            throws SQLException {
+        boolean problem = false;
+        ResultSet rs = metaData.getPrimaryKeys(catalog, null, table);
+        if (!rs.next()) {
+            LOG.error("Expected column [{0}] to be the primary key in table 
[{1}], but none were found",
+                    expectedPrimaryKeyColumn, table);
+            problem = true;
+        } else {
+            String foundPrimaryKeyColumn = rs.getString("COLUMN_NAME");
+            if (!foundPrimaryKeyColumn.equals(expectedPrimaryKeyColumn)) {
+                LOG.error("Expected column [{0}] to be the primary key in 
table [{1}], but found column [{2}] instead",
+                        expectedPrimaryKeyColumn, table, 
foundPrimaryKeyColumn);
+                problem = true;
+            } else {
+                LOG.debug("Found column [{0}] to be the primary key in table 
[{1}]", expectedPrimaryKeyColumn, table);
+            }
+        }
+        return problem;
+    }
+
+    private boolean checkIndexes(DatabaseMetaData metaData, String catalog, 
String table, Set<String> expectedIndexedColumns)
+            throws SQLException {
+        boolean problem = false;
+        Set<String> foundIndexedColumns = new HashSet<String>();
+        ResultSet rs = metaData.getIndexInfo(catalog, null, table, false, 
true);
+        while (rs.next()) {
+            String colName = rs.getString("COLUMN_NAME");
+            if (colName != null) {
+                foundIndexedColumns.add(colName);
+            }
+        }
+        Collection missingIndexColumns = 
CollectionUtils.subtract(expectedIndexedColumns, foundIndexedColumns);
+        if (!missingIndexColumns.isEmpty()) {
+            LOG.error("Found [{0}] missing indexes for columns in table [{1}]: 
{2}",
+                    missingIndexColumns.size(), table, 
Arrays.toString(missingIndexColumns.toArray()));
+            problem = true;
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("No missing indexes found in table [{0}]: {1}",
+                        table, 
Arrays.toString(expectedIndexedColumns.toArray()));
+            }
+        }
+        if (!ignoreExtras) {
+            Collection extraIndexColumns = 
CollectionUtils.subtract(foundIndexedColumns, expectedIndexedColumns);
+            if (!extraIndexColumns.isEmpty()) {
+                LOG.error("Found [{0}] extra indexes for columns in table 
[{1}]: {2}",
+                        extraIndexColumns.size(), table, 
Arrays.toString(extraIndexColumns.toArray()));
+                problem = true;
+            } else {
+                LOG.debug("No extra indexes found in table [{0}]", table);
+            }
+        }
+        return problem;
+    }
+
+    private String getTableName(Class<? extends JsonBean> clazz) {
+        Table tabAnn = clazz.getAnnotation(Table.class);
+        if (tabAnn != null) {
+            return caseTableName(tabAnn.name());
+        }
+        return null;
+    }
+
+    private String caseTableName(String name) {
+        // MySQL and Oracle wants table names in all caps
+        if (dbType.equals("mysql") || dbType.equals("oracle")) {
+            return name.toUpperCase();
+        }
+        // Postgres wants table names in all lowers
+        if (dbType.equals("postgresql")) {
+            return name.toLowerCase();
+        }
+        return name;
+    }
+
+    private String getSQLTypeFromInt(int t) {
+        switch (t) {
+            case Types.BIT:
+                return "BIT";
+            case Types.TINYINT:
+                return "TINYINT";
+            case Types.SMALLINT:
+                return "SMALLINT";
+            case Types.INTEGER:
+                return "INTEGER";
+            case Types.BIGINT:
+                return "BIGINT";
+            case Types.FLOAT:
+                return "FLOAT";
+            case Types.REAL:
+                return "REAL";
+            case Types.DOUBLE:
+                return "DOUBLE";
+            case Types.NUMERIC:
+                return "NUMERIC";
+            case Types.DECIMAL:
+                return "DECIMAL";
+            case Types.CHAR:
+                return "CHAR";
+            case Types.VARCHAR:
+                return "VARCHAR";
+            case Types.LONGVARCHAR:
+                return "LONGVARCHAR";
+            case Types.DATE:
+                return "DATE";
+            case Types.TIME:
+                return "TIME";
+            case Types.TIMESTAMP:
+                return "TIMESTAMP";
+            case Types.BINARY:
+                return "BINARY";
+            case Types.VARBINARY:
+                return "VARBINARY";
+            case Types.LONGVARBINARY:
+                return "LONGVARBINARY";
+            case Types.NULL:
+                return "NULL";
+            case Types.OTHER:
+                return "OTHER";
+            case Types.JAVA_OBJECT:
+                return "JAVA_OBJECT";
+            case Types.DISTINCT:
+                return "DISTINCT";
+            case Types.STRUCT:
+                return "STRUCT";
+            case Types.ARRAY:
+                return "ARRAY";
+            case Types.BLOB:
+                return "BLOB";
+            case Types.CLOB:
+                return "CLOB";
+            case Types.REF:
+                return "REF";
+            case Types.DATALINK:
+                return "DATALINK";
+            case Types.BOOLEAN:
+                return "BOOLEAN";
+            case Types.ROWID:
+                return "ROWID";
+            case Types.NCHAR:
+                return "NCHAR";
+            case Types.NVARCHAR:
+                return "NVARCHAR";
+            case Types.LONGNVARCHAR:
+                return "LONGNVARCHAR";
+            case Types.NCLOB:
+                return "NCLOB";
+            case Types.SQLXML:
+                return "SQLXML";
+            default:
+                return "unknown";
+        }
+    }
+
+    private static class TableInfo {
+        String primaryKeyColumn;
+        Map<String, Integer> columnTypes;
+        Set<String> indexedColumns;
+
+        public TableInfo(Class<? extends JsonBean> clazz, String dbType) {
+            columnTypes = new HashMap<String, Integer>();
+            indexedColumns = new HashSet<String>();
+            populate(clazz, dbType);
+            // The "SLA_EVENTS" table is made up of two classes (JsonSLAEvent 
and SLAEventBean), and the reflection doesn't pick up
+            // from both automatically, so we have to manually do this
+            if (clazz.equals(JsonSLAEvent.class)) {
+                populate(SLAEventBean.class, dbType);
+            }
+        }
+
+        private void populate(Class<? extends JsonBean> clazz, String dbType) {
+            Field[] fields = clazz.getDeclaredFields();
+            for (Field field : fields) {
+                Column colAnn = field.getAnnotation(Column.class);
+                if (colAnn != null) {
+                    String name = caseColumnName(colAnn.name(), dbType);
+                    boolean isLob = (field.getAnnotation(Lob.class) != null);
+                    Integer type = getSQLType(field.getType(), isLob, dbType);
+                    columnTypes.put(name, type);
+                    boolean isIndex = (field.getAnnotation(Index.class) != 
null);
+                    if (isIndex) {
+                        indexedColumns.add(name);
+                    }
+                    boolean isPrimaryKey = (field.getAnnotation(Id.class) != 
null);
+                    if (isPrimaryKey) {
+                        indexedColumns.add(name);
+                        primaryKeyColumn = name;
+                    }
+                } else {
+                    // Some Id fields don't have an @Column annotation
+                    Id idAnn = field.getAnnotation(Id.class);
+                    if (idAnn != null) {
+                        String name = caseColumnName(field.getName(), dbType);
+                        boolean isLob = (field.getAnnotation(Lob.class) != 
null);
+                        Integer type = getSQLType(field.getType(), isLob, 
dbType);
+                        columnTypes.put(name, type);
+                        indexedColumns.add(name);
+                        primaryKeyColumn = name;
+                    }
+                }
+            }
+            DiscriminatorColumn discAnn = 
clazz.getAnnotation(DiscriminatorColumn.class);
+            if (discAnn != null) {
+                String name = caseColumnName(discAnn.name(), dbType);
+                Integer type = getSQLType(discAnn.discriminatorType());
+                columnTypes.put(name, type);
+                indexedColumns.add(name);
+            }
+            // For some reason, MySQL doesn't end up having this index...
+            if (dbType.equals("mysql") && 
clazz.equals(WorkflowActionBean.class)) {
+                indexedColumns.remove("wf_id");
+            }
+        }
+
+        private static Integer getSQLType(Class<?> clazz, boolean isLob, 
String dbType) {
+            if (clazz.equals(String.class)) {
+                if (dbType.equals("mysql") && isLob) {
+                    return Types.LONGVARCHAR;
+                }
+                if (dbType.equals("oracle") && isLob) {
+                    return Types.CLOB;
+                }
+                return Types.VARCHAR;
+            }
+            if (clazz.equals(StringBlob.class) || 
clazz.equals(BinaryBlob.class)) {
+                if (dbType.equals("mysql")) {
+                    return Types.LONGVARBINARY;
+                }
+                if (dbType.equals("oracle")) {
+                    return Types.BLOB;
+                }
+                return Types.BINARY;
+            }
+            if (clazz.equals(Timestamp.class)) {
+                return Types.TIMESTAMP;
+            }
+            if (clazz.equals(int.class)) {
+                if (dbType.equals("oracle")) {
+                    return Types.DECIMAL;
+                }
+                return Types.INTEGER;
+            }
+            if (clazz.equals(long.class)) {
+                if (dbType.equals("oracle")) {
+                    return Types.DECIMAL;
+                }
+                return Types.BIGINT;
+            }
+            if (clazz.equals(byte.class)) {
+                if (dbType.equals("mysql")) {
+                    return Types.TINYINT;
+                }
+                if (dbType.equals("oracle")) {
+                    return Types.DECIMAL;
+                }
+                return Types.SMALLINT;
+            }
+            return null;
+        }
+
+        private static Integer getSQLType(DiscriminatorType discType) {
+            switch (discType) {
+                case STRING:
+                    return Types.VARCHAR;
+                case CHAR:
+                    return Types.CHAR;
+                case INTEGER:
+                    return Types.INTEGER;
+            }
+            return null;
+        }
+
+        private static String caseColumnName(String name, String dbType) {
+            // Oracle wants column names in all caps
+            if (dbType.equals("oracle")) {
+                return name.toUpperCase();
+            }
+            // Postgres and MySQL want column names in all lowers
+            if (dbType.equals("postgresql") || dbType.equals("mysql")) {
+                return name.toLowerCase();
+            }
+            return name;
+        }
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java 
b/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java
new file mode 100644
index 0000000..7fda9e2
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.SchemaCheckXCommand;
+import org.apache.oozie.util.Instrumentable;
+import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.XLog;
+
+import java.util.Date;
+
+public class SchemaCheckerService implements Service, Instrumentable {
+    private XLog LOG = XLog.getLog(SchemaCheckerService.class);
+
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"SchemaCheckerService.";
+    public static final String CONF_INTERVAL = CONF_PREFIX + "check.interval";
+    public static final String CONF_IGNORE_EXTRAS = CONF_PREFIX + 
"ignore.extras";
+
+    private String status = "N/A (not yet run)";
+    private String lastCheck = "N/A";
+
+    @Override
+    public void init(Services services) throws ServiceException {
+        String url = ConfigurationService.get(JPAService.CONF_URL);
+        String dbType = url.substring("jdbc:".length());
+        dbType = dbType.substring(0, dbType.indexOf(":"));
+
+        int interval = ConfigurationService.getInt(CONF_INTERVAL);
+        if (dbType.equals("derby") || dbType.equals("hsqldb") || 
dbType.equals("sqlserver") || interval <= 0) {
+            LOG.debug("SchemaCheckerService is disabled: not supported for 
{0}", dbType);
+            status = "DISABLED (" + dbType + " no supported)";
+        } else {
+            String driver = ConfigurationService.get(JPAService.CONF_DRIVER);
+            String user = ConfigurationService.get(JPAService.CONF_USERNAME);
+            String pass = ConfigurationService.get(JPAService.CONF_PASSWORD);
+            boolean ignoreExtras = 
ConfigurationService.getBoolean(CONF_IGNORE_EXTRAS);
+
+            try {
+                Class.forName(driver).newInstance();
+            } catch (Exception ex) {
+                throw new ServiceException(ErrorCode.E0100, 
getClass().getName(), ex);
+            }
+            Runnable schemaCheckerRunnable = new SchemaCheckerRunnable(dbType, 
url, user, pass, ignoreExtras);
+            
services.get(SchedulerService.class).schedule(schemaCheckerRunnable, 0, 
interval, SchedulerService.Unit.HOUR);
+        }
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return SchemaCheckerService.class;
+    }
+
+    @Override
+    public void instrument(Instrumentation instr) {
+        instr.addVariable("schema-checker", "status", new 
Instrumentation.Variable<String>() {
+            @Override
+            public String getValue() {
+                return status;
+            }
+        });
+        instr.addVariable("schema-checker", "last-check", new 
Instrumentation.Variable<String>() {
+            @Override
+            public String getValue() {
+                return lastCheck;
+            }
+        });
+    }
+
+    public void updateInstrumentation(boolean problem, Date time) {
+        if (problem) {
+            status = "BAD (check log for details)";
+        } else {
+            status = "GOOD";
+        }
+        lastCheck = time.toString();
+    }
+
+    static class SchemaCheckerRunnable implements Runnable {
+        private String dbType;
+        private String url;
+        private String user;
+        private String pass;
+        private boolean ignoreExtras;
+
+        public SchemaCheckerRunnable(String dbType, String url, String user, 
String pass, boolean ignoreExtras) {
+            this.dbType = dbType;
+            this.url = url;
+            this.user = user;
+            this.pass = pass;
+            this.ignoreExtras = ignoreExtras;
+        }
+
+        @Override
+        public void run() {// Only queue the schema check command if this is 
the leader
+            if (Services.get().get(JobsConcurrencyService.class).isLeader()) {
+                Services.get().get(CallableQueueService.class).queue(
+                        new SchemaCheckXCommand(dbType, url, user, pass, 
ignoreExtras));
+            } else {
+                Services.get().get(SchemaCheckerService.class).status = 
"DISABLED (not leader in HA)";
+                Services.get().get(SchemaCheckerService.class).lastCheck = 
"N/A";
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 32a1df0..d4a0536 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -130,7 +130,8 @@
             org.apache.oozie.service.ProxyUserService,
             org.apache.oozie.service.XLogStreamingService,
             org.apache.oozie.service.JvmPauseMonitorService,
-            org.apache.oozie.service.SparkConfigurationService
+            org.apache.oozie.service.SparkConfigurationService,
+            org.apache.oozie.service.SchemaCheckerService
         </value>
         <description>
             All services to be created and managed by Oozie Services singleton.
@@ -2580,4 +2581,22 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.SchemaCheckerService.check.interval</name>
+        <value>168</value>
+        <description>
+            This is the interval at which Oozie will check the database 
schema, in hours.
+            A zero or negative value will disable the checker.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.SchemaCheckerService.ignore.extras</name>
+        <value>false</value>
+        <description>
+            When set to false, the schema checker will consider extra (unused) 
tables, columns, and indexes to be incorrect.  When
+            set to true, these will be ignored.
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/docs/src/site/twiki/AG_Monitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Monitoring.twiki 
b/docs/src/site/twiki/AG_Monitoring.twiki
index 63f0542..2a3e9a0 100644
--- a/docs/src/site/twiki/AG_Monitoring.twiki
+++ b/docs/src/site/twiki/AG_Monitoring.twiki
@@ -158,6 +158,32 @@ query parameters:
    * threadsort - The order in which the threads are sorted for display. Valid 
values are name, cpu, state. Default is state.
    * cpuwatch - Time interval in milliseconds to monitor cpu usage of threads. 
Default value is 0.
 
+---++ Monitoring Database Schema Integrity
+
+Oozie stores all of its state in a database.  Hence, ensuring that the 
database schema is correct is very important to ensuring that
+Oozie is healthy and behaves correctly.  To help with this, Oozie includes a 
=SchemaCheckerService= which periodically runs and
+performs a series of checks on the database schema.  More specifically, it 
checks the following:
+   * Existence of the required tables
+   * Existence of the required columns in each table
+   * Each column has the correct type and default value
+   * Existence of the required primary keys and indexes
+
+After each run, the =SchemaCheckerService= writes the result of the checks to 
the Oozie log and to the "schema-checker.status"
+instrumentation variable.  If there's a problem, it will be logged at the 
ERROR level, while correct checks are logged at the DEBUG
+level.
+
+By default, the =SchemaCheckerService= runs every 7 days.  This can be 
configured
+by =oozie.service.SchemaCheckerService.check.interval=
+
+By default, the =SchemaCheckerService= will consider "extra" tables, columns, 
and indexes to be incorrect. Advanced users who have
+added additional tables, columns, and indexes can tell Oozie to ignore these by
+seting =oozie.service.SchemaCheckerService.ignore.extras= to =false=.
+
+The =SchemaCheckerService= currently only supports MySQL, PostgreSQL, and 
Oracle databases.  SQL Server and Derby are currently not
+supported.
+
+When Oozie HA is enabled, only one of the Oozie servers will perform the 
checks.
+
 [[index][::Go back to Oozie Documentation Index::]]
 
 </noautolink>

http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index eedbce6..a01e7c6 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2245 Service to periodically check database schema (rkanter)
 OOZIE-2332 Add ability to provide Hive and Hive 2 Action queries inline in 
workflows (prateekrungta via rkanter)
 OOZIE-2329 Make handling yarn restarts configurable (puru)
 OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru)

Reply via email to