Repository: oozie Updated Branches: refs/heads/master b074b2cee -> 9839fb292
OOZIE-2245 Service to periodically check database schema (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9839fb29 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9839fb29 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9839fb29 Branch: refs/heads/master Commit: 9839fb292e5483f4d101849483109208de75f9c3 Parents: b074b2c Author: Robert Kanter <[email protected]> Authored: Thu Aug 27 10:15:34 2015 -0700 Committer: Robert Kanter <[email protected]> Committed: Thu Aug 27 10:15:34 2015 -0700 ---------------------------------------------------------------------- .../oozie/command/SchemaCheckXCommand.java | 529 +++++++++++++++++++ .../oozie/service/SchemaCheckerService.java | 125 +++++ core/src/main/resources/oozie-default.xml | 21 +- docs/src/site/twiki/AG_Monitoring.twiki | 26 + release-log.txt | 1 + 5 files changed, 701 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java new file mode 100644 index 0000000..1cc086e --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/SchemaCheckXCommand.java @@ -0,0 +1,529 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.oozie.BinaryBlob; +import org.apache.oozie.BundleActionBean; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.SLAEventBean; +import org.apache.oozie.StringBlob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.client.rest.JsonSLAEvent; +import org.apache.oozie.service.SchemaCheckerService; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLARegistrationBean; +import org.apache.oozie.sla.SLASummaryBean; +import org.apache.oozie.util.Pair; +import org.apache.oozie.util.XLog; +import org.apache.openjpa.persistence.jdbc.Index; + +import javax.persistence.Column; +import javax.persistence.DiscriminatorColumn; +import javax.persistence.DiscriminatorType; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.Table; +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class SchemaCheckXCommand extends XCommand<Void> { + private XLog LOG = XLog.getLog(SchemaCheckXCommand.class); + + private String dbType; + private String url; + private String user; + private String pass; + private boolean ignoreExtras; + + public SchemaCheckXCommand(String dbType, String url, String user, String pass, boolean ignoreExtras) { + super("schema-check", "schema-check", 0); + this.dbType = dbType; + this.url = url; + this.user = user; + this.pass = pass; + this.ignoreExtras = ignoreExtras; + } + + @Override + protected Void execute() throws CommandException { + Connection conn = null; + LOG.info("About to check database schema"); + Date startTime = new Date(); + boolean problem = false; + try { + conn = DriverManager.getConnection(url, user, pass); + String catalog = conn.getCatalog(); + DatabaseMetaData metaData = conn.getMetaData(); + + Map<String, Class<? extends JsonBean>> tableClasses = new HashMap<String, Class<? extends JsonBean>>(); + tableClasses.put(getTableName(BundleActionBean.class), BundleActionBean.class); + tableClasses.put(getTableName(BundleJobBean.class), BundleJobBean.class); + tableClasses.put(getTableName(CoordinatorActionBean.class), CoordinatorActionBean.class); + tableClasses.put(getTableName(CoordinatorJobBean.class), CoordinatorJobBean.class); + tableClasses.put(getTableName(JsonSLAEvent.class), JsonSLAEvent.class); + tableClasses.put(getTableName(SLARegistrationBean.class), SLARegistrationBean.class); + tableClasses.put(getTableName(SLASummaryBean.class), SLASummaryBean.class); + tableClasses.put(getTableName(WorkflowActionBean.class), WorkflowActionBean.class); + tableClasses.put(getTableName(WorkflowJobBean.class), WorkflowJobBean.class); + + boolean tableProblem = checkTables(metaData, catalog, tableClasses.keySet()); + problem = problem | tableProblem; + if (!tableProblem) { + for (Map.Entry<String, Class<? extends JsonBean>> table : tableClasses.entrySet()) { + TableInfo ti = new TableInfo(table.getValue(), dbType); + boolean columnProblem = checkColumns(metaData, catalog, table.getKey(), ti.columnTypes); + problem = problem | columnProblem; + if (!columnProblem) { + boolean primaryKeyProblem = checkPrimaryKey(metaData, catalog, table.getKey(), ti.primaryKeyColumn); + problem = problem | primaryKeyProblem; + boolean indexProblem = checkIndexes(metaData, catalog, table.getKey(), ti.indexedColumns); + problem = problem | indexProblem; + } + } + } + if (problem) { + LOG.error("Database schema is BAD! Check previous error log messages for details"); + } else { + LOG.info("Database schema is GOOD"); + } + } catch (SQLException sqle) { + LOG.error("An Exception occured while talking to the database: " + sqle.getMessage(), sqle); + problem = true; + } finally { + if (conn != null) { + try { + conn.close(); + } catch (Exception e) { + LOG.error("An Exception occured while disconnecting from the database: " + e.getMessage(), e); + } + } + Services.get().get(SchemaCheckerService.class).updateInstrumentation(problem, startTime); + } + return null; + } + + private boolean checkTables(DatabaseMetaData metaData, String catalog, final Collection<String> expectedTablesRaw) + throws SQLException { + boolean problem = false; + Set<String> expectedTables = new HashSet<String>(expectedTablesRaw); + expectedTables.add(caseTableName("oozie_sys")); + expectedTables.add(caseTableName("openjpa_sequence_table")); + expectedTables.add(caseTableName("validate_conn")); + // Oracle returns > 1000 tables if we don't have the schema "OOZIE"; MySQL and Postgres don't want this + String schema = null; + if (dbType.equals("oracle")) { + schema = "OOZIE"; + } + ResultSet rs = metaData.getTables(catalog, schema, null, new String[]{"TABLE"}); + Set<String> foundTables = new HashSet<String>(); + while (rs.next()) { + String tabName = rs.getString("TABLE_NAME"); + if (tabName != null) { + foundTables.add(tabName); + } + } + Collection missingTables = CollectionUtils.subtract(expectedTables, foundTables); + if (!missingTables.isEmpty()) { + LOG.error("Found [{0}] missing tables: {1}", missingTables.size(), Arrays.toString(missingTables.toArray())); + problem = true; + } else if (LOG.isDebugEnabled()) { + LOG.debug("No missing tables found: {0}", Arrays.toString(expectedTables.toArray())); + } + if (!ignoreExtras) { + Collection extraTables = CollectionUtils.subtract(foundTables, expectedTables); + if (!extraTables.isEmpty()) { + LOG.error("Found [{0}] extra tables: {1}", extraTables.size(), Arrays.toString(extraTables.toArray())); + problem = true; + } else { + LOG.debug("No extra tables found"); + } + } + return problem; + } + + private boolean checkColumns(DatabaseMetaData metaData, String catalog, String table, + Map<String, Integer> expectedColumnTypes) throws SQLException { + boolean problem = false; + Map<String, Pair<Integer, String>> foundColumns = new HashMap<String, Pair<Integer, String>>(); + ResultSet rs = metaData.getColumns(catalog, null, table, null); + while (rs.next()) { + String colName = rs.getString("COLUMN_NAME"); + Integer dataType = rs.getInt("DATA_TYPE"); + String colDef = rs.getString("COLUMN_DEF"); + if (colName != null) { + foundColumns.put(colName, new Pair<Integer, String>(dataType, colDef)); + } + } + Collection missingColumns = CollectionUtils.subtract(expectedColumnTypes.keySet(), foundColumns.keySet()); + if (!missingColumns.isEmpty()) { + LOG.error("Found [{0}] missing columns in table [{1}]: {2}", + missingColumns.size(), table, Arrays.toString(missingColumns.toArray())); + problem = true; + } else { + for (Map.Entry<String, Integer> ent : expectedColumnTypes.entrySet()) { + if (!foundColumns.get(ent.getKey()).getFist().equals(ent.getValue())) { + LOG.error("Expected column [{0}] in table [{1}] to have type [{2}], but found type [{3}]", + ent.getKey(), table, getSQLTypeFromInt(ent.getValue()), + getSQLTypeFromInt(foundColumns.get(ent.getKey()).getFist())); + problem = true; + } else if (foundColumns.get(ent.getKey()).getSecond() != null) { + LOG.error("Expected column [{0}] in table [{1}] to have default value [NULL], but found default vale [{2}]", + ent.getKey(), table, foundColumns.get(ent.getKey()).getSecond()); + problem = true; + } else { + LOG.debug("Found column [{0}] in table [{1}] with type [{2}] and default value [NULL]", + ent.getKey(), table, getSQLTypeFromInt(ent.getValue())); + } + } + } + if (!ignoreExtras) { + Collection extraColumns = CollectionUtils.subtract(foundColumns.keySet(), expectedColumnTypes.keySet()); + if (!extraColumns.isEmpty()) { + LOG.error("Found [{0}] extra columns in table [{1}]: {2}", + extraColumns.size(), table, Arrays.toString(extraColumns.toArray())); + problem = true; + } else { + LOG.debug("No extra columns found in table [{0}]", table); + } + } + return problem; + } + + private boolean checkPrimaryKey(DatabaseMetaData metaData, String catalog, String table, String expectedPrimaryKeyColumn) + throws SQLException { + boolean problem = false; + ResultSet rs = metaData.getPrimaryKeys(catalog, null, table); + if (!rs.next()) { + LOG.error("Expected column [{0}] to be the primary key in table [{1}], but none were found", + expectedPrimaryKeyColumn, table); + problem = true; + } else { + String foundPrimaryKeyColumn = rs.getString("COLUMN_NAME"); + if (!foundPrimaryKeyColumn.equals(expectedPrimaryKeyColumn)) { + LOG.error("Expected column [{0}] to be the primary key in table [{1}], but found column [{2}] instead", + expectedPrimaryKeyColumn, table, foundPrimaryKeyColumn); + problem = true; + } else { + LOG.debug("Found column [{0}] to be the primary key in table [{1}]", expectedPrimaryKeyColumn, table); + } + } + return problem; + } + + private boolean checkIndexes(DatabaseMetaData metaData, String catalog, String table, Set<String> expectedIndexedColumns) + throws SQLException { + boolean problem = false; + Set<String> foundIndexedColumns = new HashSet<String>(); + ResultSet rs = metaData.getIndexInfo(catalog, null, table, false, true); + while (rs.next()) { + String colName = rs.getString("COLUMN_NAME"); + if (colName != null) { + foundIndexedColumns.add(colName); + } + } + Collection missingIndexColumns = CollectionUtils.subtract(expectedIndexedColumns, foundIndexedColumns); + if (!missingIndexColumns.isEmpty()) { + LOG.error("Found [{0}] missing indexes for columns in table [{1}]: {2}", + missingIndexColumns.size(), table, Arrays.toString(missingIndexColumns.toArray())); + problem = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("No missing indexes found in table [{0}]: {1}", + table, Arrays.toString(expectedIndexedColumns.toArray())); + } + } + if (!ignoreExtras) { + Collection extraIndexColumns = CollectionUtils.subtract(foundIndexedColumns, expectedIndexedColumns); + if (!extraIndexColumns.isEmpty()) { + LOG.error("Found [{0}] extra indexes for columns in table [{1}]: {2}", + extraIndexColumns.size(), table, Arrays.toString(extraIndexColumns.toArray())); + problem = true; + } else { + LOG.debug("No extra indexes found in table [{0}]", table); + } + } + return problem; + } + + private String getTableName(Class<? extends JsonBean> clazz) { + Table tabAnn = clazz.getAnnotation(Table.class); + if (tabAnn != null) { + return caseTableName(tabAnn.name()); + } + return null; + } + + private String caseTableName(String name) { + // MySQL and Oracle wants table names in all caps + if (dbType.equals("mysql") || dbType.equals("oracle")) { + return name.toUpperCase(); + } + // Postgres wants table names in all lowers + if (dbType.equals("postgresql")) { + return name.toLowerCase(); + } + return name; + } + + private String getSQLTypeFromInt(int t) { + switch (t) { + case Types.BIT: + return "BIT"; + case Types.TINYINT: + return "TINYINT"; + case Types.SMALLINT: + return "SMALLINT"; + case Types.INTEGER: + return "INTEGER"; + case Types.BIGINT: + return "BIGINT"; + case Types.FLOAT: + return "FLOAT"; + case Types.REAL: + return "REAL"; + case Types.DOUBLE: + return "DOUBLE"; + case Types.NUMERIC: + return "NUMERIC"; + case Types.DECIMAL: + return "DECIMAL"; + case Types.CHAR: + return "CHAR"; + case Types.VARCHAR: + return "VARCHAR"; + case Types.LONGVARCHAR: + return "LONGVARCHAR"; + case Types.DATE: + return "DATE"; + case Types.TIME: + return "TIME"; + case Types.TIMESTAMP: + return "TIMESTAMP"; + case Types.BINARY: + return "BINARY"; + case Types.VARBINARY: + return "VARBINARY"; + case Types.LONGVARBINARY: + return "LONGVARBINARY"; + case Types.NULL: + return "NULL"; + case Types.OTHER: + return "OTHER"; + case Types.JAVA_OBJECT: + return "JAVA_OBJECT"; + case Types.DISTINCT: + return "DISTINCT"; + case Types.STRUCT: + return "STRUCT"; + case Types.ARRAY: + return "ARRAY"; + case Types.BLOB: + return "BLOB"; + case Types.CLOB: + return "CLOB"; + case Types.REF: + return "REF"; + case Types.DATALINK: + return "DATALINK"; + case Types.BOOLEAN: + return "BOOLEAN"; + case Types.ROWID: + return "ROWID"; + case Types.NCHAR: + return "NCHAR"; + case Types.NVARCHAR: + return "NVARCHAR"; + case Types.LONGNVARCHAR: + return "LONGNVARCHAR"; + case Types.NCLOB: + return "NCLOB"; + case Types.SQLXML: + return "SQLXML"; + default: + return "unknown"; + } + } + + private static class TableInfo { + String primaryKeyColumn; + Map<String, Integer> columnTypes; + Set<String> indexedColumns; + + public TableInfo(Class<? extends JsonBean> clazz, String dbType) { + columnTypes = new HashMap<String, Integer>(); + indexedColumns = new HashSet<String>(); + populate(clazz, dbType); + // The "SLA_EVENTS" table is made up of two classes (JsonSLAEvent and SLAEventBean), and the reflection doesn't pick up + // from both automatically, so we have to manually do this + if (clazz.equals(JsonSLAEvent.class)) { + populate(SLAEventBean.class, dbType); + } + } + + private void populate(Class<? extends JsonBean> clazz, String dbType) { + Field[] fields = clazz.getDeclaredFields(); + for (Field field : fields) { + Column colAnn = field.getAnnotation(Column.class); + if (colAnn != null) { + String name = caseColumnName(colAnn.name(), dbType); + boolean isLob = (field.getAnnotation(Lob.class) != null); + Integer type = getSQLType(field.getType(), isLob, dbType); + columnTypes.put(name, type); + boolean isIndex = (field.getAnnotation(Index.class) != null); + if (isIndex) { + indexedColumns.add(name); + } + boolean isPrimaryKey = (field.getAnnotation(Id.class) != null); + if (isPrimaryKey) { + indexedColumns.add(name); + primaryKeyColumn = name; + } + } else { + // Some Id fields don't have an @Column annotation + Id idAnn = field.getAnnotation(Id.class); + if (idAnn != null) { + String name = caseColumnName(field.getName(), dbType); + boolean isLob = (field.getAnnotation(Lob.class) != null); + Integer type = getSQLType(field.getType(), isLob, dbType); + columnTypes.put(name, type); + indexedColumns.add(name); + primaryKeyColumn = name; + } + } + } + DiscriminatorColumn discAnn = clazz.getAnnotation(DiscriminatorColumn.class); + if (discAnn != null) { + String name = caseColumnName(discAnn.name(), dbType); + Integer type = getSQLType(discAnn.discriminatorType()); + columnTypes.put(name, type); + indexedColumns.add(name); + } + // For some reason, MySQL doesn't end up having this index... + if (dbType.equals("mysql") && clazz.equals(WorkflowActionBean.class)) { + indexedColumns.remove("wf_id"); + } + } + + private static Integer getSQLType(Class<?> clazz, boolean isLob, String dbType) { + if (clazz.equals(String.class)) { + if (dbType.equals("mysql") && isLob) { + return Types.LONGVARCHAR; + } + if (dbType.equals("oracle") && isLob) { + return Types.CLOB; + } + return Types.VARCHAR; + } + if (clazz.equals(StringBlob.class) || clazz.equals(BinaryBlob.class)) { + if (dbType.equals("mysql")) { + return Types.LONGVARBINARY; + } + if (dbType.equals("oracle")) { + return Types.BLOB; + } + return Types.BINARY; + } + if (clazz.equals(Timestamp.class)) { + return Types.TIMESTAMP; + } + if (clazz.equals(int.class)) { + if (dbType.equals("oracle")) { + return Types.DECIMAL; + } + return Types.INTEGER; + } + if (clazz.equals(long.class)) { + if (dbType.equals("oracle")) { + return Types.DECIMAL; + } + return Types.BIGINT; + } + if (clazz.equals(byte.class)) { + if (dbType.equals("mysql")) { + return Types.TINYINT; + } + if (dbType.equals("oracle")) { + return Types.DECIMAL; + } + return Types.SMALLINT; + } + return null; + } + + private static Integer getSQLType(DiscriminatorType discType) { + switch (discType) { + case STRING: + return Types.VARCHAR; + case CHAR: + return Types.CHAR; + case INTEGER: + return Types.INTEGER; + } + return null; + } + + private static String caseColumnName(String name, String dbType) { + // Oracle wants column names in all caps + if (dbType.equals("oracle")) { + return name.toUpperCase(); + } + // Postgres and MySQL want column names in all lowers + if (dbType.equals("postgresql") || dbType.equals("mysql")) { + return name.toLowerCase(); + } + return name; + } + } + + @Override + protected void loadState() throws CommandException { + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + @Override + protected boolean isLockRequired() { + return false; + } + + @Override + public String getEntityKey() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java b/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java new file mode 100644 index 0000000..7fda9e2 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/service/SchemaCheckerService.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.SchemaCheckXCommand; +import org.apache.oozie.util.Instrumentable; +import org.apache.oozie.util.Instrumentation; +import org.apache.oozie.util.XLog; + +import java.util.Date; + +public class SchemaCheckerService implements Service, Instrumentable { + private XLog LOG = XLog.getLog(SchemaCheckerService.class); + + public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaCheckerService."; + public static final String CONF_INTERVAL = CONF_PREFIX + "check.interval"; + public static final String CONF_IGNORE_EXTRAS = CONF_PREFIX + "ignore.extras"; + + private String status = "N/A (not yet run)"; + private String lastCheck = "N/A"; + + @Override + public void init(Services services) throws ServiceException { + String url = ConfigurationService.get(JPAService.CONF_URL); + String dbType = url.substring("jdbc:".length()); + dbType = dbType.substring(0, dbType.indexOf(":")); + + int interval = ConfigurationService.getInt(CONF_INTERVAL); + if (dbType.equals("derby") || dbType.equals("hsqldb") || dbType.equals("sqlserver") || interval <= 0) { + LOG.debug("SchemaCheckerService is disabled: not supported for {0}", dbType); + status = "DISABLED (" + dbType + " no supported)"; + } else { + String driver = ConfigurationService.get(JPAService.CONF_DRIVER); + String user = ConfigurationService.get(JPAService.CONF_USERNAME); + String pass = ConfigurationService.get(JPAService.CONF_PASSWORD); + boolean ignoreExtras = ConfigurationService.getBoolean(CONF_IGNORE_EXTRAS); + + try { + Class.forName(driver).newInstance(); + } catch (Exception ex) { + throw new ServiceException(ErrorCode.E0100, getClass().getName(), ex); + } + Runnable schemaCheckerRunnable = new SchemaCheckerRunnable(dbType, url, user, pass, ignoreExtras); + services.get(SchedulerService.class).schedule(schemaCheckerRunnable, 0, interval, SchedulerService.Unit.HOUR); + } + } + + @Override + public void destroy() { + } + + @Override + public Class<? extends Service> getInterface() { + return SchemaCheckerService.class; + } + + @Override + public void instrument(Instrumentation instr) { + instr.addVariable("schema-checker", "status", new Instrumentation.Variable<String>() { + @Override + public String getValue() { + return status; + } + }); + instr.addVariable("schema-checker", "last-check", new Instrumentation.Variable<String>() { + @Override + public String getValue() { + return lastCheck; + } + }); + } + + public void updateInstrumentation(boolean problem, Date time) { + if (problem) { + status = "BAD (check log for details)"; + } else { + status = "GOOD"; + } + lastCheck = time.toString(); + } + + static class SchemaCheckerRunnable implements Runnable { + private String dbType; + private String url; + private String user; + private String pass; + private boolean ignoreExtras; + + public SchemaCheckerRunnable(String dbType, String url, String user, String pass, boolean ignoreExtras) { + this.dbType = dbType; + this.url = url; + this.user = user; + this.pass = pass; + this.ignoreExtras = ignoreExtras; + } + + @Override + public void run() {// Only queue the schema check command if this is the leader + if (Services.get().get(JobsConcurrencyService.class).isLeader()) { + Services.get().get(CallableQueueService.class).queue( + new SchemaCheckXCommand(dbType, url, user, pass, ignoreExtras)); + } else { + Services.get().get(SchemaCheckerService.class).status = "DISABLED (not leader in HA)"; + Services.get().get(SchemaCheckerService.class).lastCheck = "N/A"; + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 32a1df0..d4a0536 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -130,7 +130,8 @@ org.apache.oozie.service.ProxyUserService, org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.JvmPauseMonitorService, - org.apache.oozie.service.SparkConfigurationService + org.apache.oozie.service.SparkConfigurationService, + org.apache.oozie.service.SchemaCheckerService </value> <description> All services to be created and managed by Oozie Services singleton. @@ -2580,4 +2581,22 @@ </description> </property> + <property> + <name>oozie.service.SchemaCheckerService.check.interval</name> + <value>168</value> + <description> + This is the interval at which Oozie will check the database schema, in hours. + A zero or negative value will disable the checker. + </description> + </property> + + <property> + <name>oozie.service.SchemaCheckerService.ignore.extras</name> + <value>false</value> + <description> + When set to false, the schema checker will consider extra (unused) tables, columns, and indexes to be incorrect. When + set to true, these will be ignored. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/docs/src/site/twiki/AG_Monitoring.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_Monitoring.twiki b/docs/src/site/twiki/AG_Monitoring.twiki index 63f0542..2a3e9a0 100644 --- a/docs/src/site/twiki/AG_Monitoring.twiki +++ b/docs/src/site/twiki/AG_Monitoring.twiki @@ -158,6 +158,32 @@ query parameters: * threadsort - The order in which the threads are sorted for display. Valid values are name, cpu, state. Default is state. * cpuwatch - Time interval in milliseconds to monitor cpu usage of threads. Default value is 0. +---++ Monitoring Database Schema Integrity + +Oozie stores all of its state in a database. Hence, ensuring that the database schema is correct is very important to ensuring that +Oozie is healthy and behaves correctly. To help with this, Oozie includes a =SchemaCheckerService= which periodically runs and +performs a series of checks on the database schema. More specifically, it checks the following: + * Existence of the required tables + * Existence of the required columns in each table + * Each column has the correct type and default value + * Existence of the required primary keys and indexes + +After each run, the =SchemaCheckerService= writes the result of the checks to the Oozie log and to the "schema-checker.status" +instrumentation variable. If there's a problem, it will be logged at the ERROR level, while correct checks are logged at the DEBUG +level. + +By default, the =SchemaCheckerService= runs every 7 days. This can be configured +by =oozie.service.SchemaCheckerService.check.interval= + +By default, the =SchemaCheckerService= will consider "extra" tables, columns, and indexes to be incorrect. Advanced users who have +added additional tables, columns, and indexes can tell Oozie to ignore these by +seting =oozie.service.SchemaCheckerService.ignore.extras= to =false=. + +The =SchemaCheckerService= currently only supports MySQL, PostgreSQL, and Oracle databases. SQL Server and Derby are currently not +supported. + +When Oozie HA is enabled, only one of the Oozie servers will perform the checks. + [[index][::Go back to Oozie Documentation Index::]] </noautolink> http://git-wip-us.apache.org/repos/asf/oozie/blob/9839fb29/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index eedbce6..a01e7c6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2245 Service to periodically check database schema (rkanter) OOZIE-2332 Add ability to provide Hive and Hive 2 Action queries inline in workflows (prateekrungta via rkanter) OOZIE-2329 Make handling yarn restarts configurable (puru) OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru)
