http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java new file mode 100644 index 0000000..72d1aba --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.store.service; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.service.FalconService; +import org.apache.falcon.state.store.jdbc.EntityBean; +import org.apache.falcon.state.store.jdbc.InstanceBean; +import org.apache.falcon.util.StartupProperties; +import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import java.text.MessageFormat; +import java.util.Properties; + +/** + * Service that manages JPA. + */ +public final class FalconJPAService implements FalconService { + + private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class); + public static final String PREFIX = "falcon.statestore."; + + public static final String DB_SCHEMA = PREFIX + "schema.name"; + public static final String URL = PREFIX + "jdbc.url"; + public static final String DRIVER = PREFIX + "jdbc.driver"; + public static final String USERNAME = PREFIX + "jdbc.username"; + public static final String PASSWORD = PREFIX + "jdbc.password"; + public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source"; + public static final String CONN_PROPERTIES = PREFIX + "connection.properties"; + public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn"; + public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema"; + public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection"; + public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval"; + public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num"; + + private EntityManagerFactory entityManagerFactory; + // Persistent Unit which is defined in persistence.xml + private String persistenceUnit; + private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService(); + + private FalconJPAService() { + } + + public static FalconJPAService get() { + return FALCON_JPA_SERVICE; + } + + public EntityManagerFactory getEntityManagerFactory() { + return entityManagerFactory; + } + + public void setPersistenceUnit(String dbType) { + if (StringUtils.isEmpty(dbType)) { + throw new IllegalArgumentException(" DB type cannot be null or empty"); + } + dbType = dbType.split(":")[0]; + this.persistenceUnit = "falcon-" + dbType; + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public void init() throws FalconException { + Properties props = getPropsforStore(); + entityManagerFactory = Persistence. + createEntityManagerFactory(persistenceUnit, props); + EntityManager entityManager = getEntityManager(); + entityManager.find(EntityBean.class, 1); + entityManager.find(InstanceBean.class, 1); + LOG.info("All entities initialized"); + + // need to use a pseudo no-op transaction so all entities, datasource + // and connection pool are initialized one time only + entityManager.getTransaction().begin(); + OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory; + // Mask the password with '***' + String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,"); + LOG.info("JPA configuration: {0}", logMsg); + entityManager.getTransaction().commit(); + entityManager.close(); + } + + private Properties getPropsforStore() throws FalconException { + String dbSchema = StartupProperties.get().getProperty(DB_SCHEMA); + String url = StartupProperties.get().getProperty(URL); + String driver = StartupProperties.get().getProperty(DRIVER); + String user = StartupProperties.get().getProperty(USERNAME); + String password = StartupProperties.get().getProperty(PASSWORD).trim(); + String maxConn = StartupProperties.get().getProperty(MAX_ACTIVE_CONN).trim(); + String dataSource = StartupProperties.get().getProperty(CONN_DATA_SOURCE); + String connPropsConfig = StartupProperties.get().getProperty(CONN_PROPERTIES); + boolean autoSchemaCreation = Boolean.parseBoolean(StartupProperties.get().getProperty(CREATE_DB_SCHEMA, + "false")); + boolean validateDbConn = Boolean.parseBoolean(StartupProperties.get().getProperty(VALIDATE_DB_CONN, "true")); + String evictionInterval = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim(); + String evictionNum = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim(); + + if (!url.startsWith("jdbc:")) { + throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url); + } + String dbType = url.substring("jdbc:".length()); + if (dbType.indexOf(":") <= 0) { + throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url); + } + setPersistenceUnit(dbType); + String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}"; + connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn); + Properties props = new Properties(); + if (autoSchemaCreation) { + connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; + props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)"); + } else if (validateDbConn) { + // validation can be done only if the schema already exist, else a + // connection cannot be obtained to create the schema. + String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval; + String num = "numTestsPerEvictionRun=" + evictionNum; + connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num; + connProps += ",ValidationQuery=select count(*) from ENTITIES"; + connProps = MessageFormat.format(connProps, dbSchema); + } else { + connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false"; + } + if (connPropsConfig != null) { + connProps += "," + connPropsConfig; + } + props.setProperty("openjpa.ConnectionProperties", connProps); + props.setProperty("openjpa.ConnectionDriverName", dataSource); + return props; + } + + @Override + public void destroy() throws FalconException { + if (entityManagerFactory.isOpen()) { + entityManagerFactory.close(); + } + } + + + /** + * Return an EntityManager. Used by the StoreService. + * + * @return an entity manager + */ + public EntityManager getEntityManager() { + return getEntityManagerFactory().createEntityManager(); + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java new file mode 100644 index 0000000..381b0b3 --- /dev/null +++ b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.tools; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.falcon.cli.CLIParser; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.util.BuildProperties; +import org.apache.falcon.util.StartupProperties; + +import java.io.File; +import java.io.FileWriter; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Command Line utility for Table Creation, Update. + */ +public class FalconStateStoreDBCLI { + public static final String HELP_CMD = "help"; + public static final String VERSION_CMD = "version"; + public static final String CREATE_CMD = "create"; + public static final String SQL_FILE_OPT = "sqlfile"; + public static final String RUN_OPT = "run"; + public static final String UPGRADE_CMD = "upgrade"; + + // Represents whether DB instance exists or not. + private boolean instanceExists; + private static final String[] FALCON_HELP = {"Falcon DB initialization tool currently supports Derby DB"}; + + public static void main(String[] args) { + new FalconStateStoreDBCLI().run(args); + } + + public FalconStateStoreDBCLI() { + instanceExists = false; + } + + protected Options getOptions() { + Option sqlfile = new Option(SQL_FILE_OPT, true, + "Generate SQL script instead of creating/upgrading the DB schema"); + Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade"); + Options options = new Options(); + options.addOption(sqlfile); + options.addOption(run); + return options; + } + + public synchronized int run(String[] args) { + if (instanceExists) { + throw new IllegalStateException("CLI instance already used"); + } + instanceExists = true; + + CLIParser parser = new CLIParser("falcondb", FALCON_HELP); + parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false); + parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false); + parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false); + parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false); + + try { + CLIParser.Command command = parser.parse(args); + if (command.getName().equals(HELP_CMD)) { + parser.showHelp(); + } else if (command.getName().equals(VERSION_CMD)) { + showVersion(); + } else { + if (!command.getCommandLine().hasOption(SQL_FILE_OPT) + && !command.getCommandLine().hasOption(RUN_OPT)) { + throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified"); + } + CommandLine commandLine = command.getCommandLine(); + String sqlFile = (commandLine.hasOption(SQL_FILE_OPT)) + ? commandLine.getOptionValue(SQL_FILE_OPT) + : File.createTempFile("falcondb-", ".sql").getAbsolutePath(); + boolean run = commandLine.hasOption(RUN_OPT); + if (command.getName().equals(CREATE_CMD)) { + createDB(sqlFile, run); + } else if (command.getName().equals(UPGRADE_CMD)) { + upgradeDB(sqlFile, run); + } + System.out.println("The SQL commands have been written to: " + sqlFile); + if (!run) { + System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option"); + } + } + return 0; + } catch (ParseException ex) { + System.err.println("Invalid sub-command: " + ex.getMessage()); + System.err.println(); + System.err.println(parser.shortHelp()); + return 1; + } catch (Exception ex) { + System.err.println(); + System.err.println("Error: " + ex.getMessage()); + System.err.println(); + System.err.println("Stack trace for the error was (for debug purposes):"); + System.err.println("--------------------------------------"); + ex.printStackTrace(System.err); + System.err.println("--------------------------------------"); + System.err.println(); + return 1; + } + } + + private void upgradeDB(String sqlFile, boolean run) throws Exception { + validateConnection(); + if (!checkDBExists()) { + throw new Exception("Falcon DB doesn't exist"); + } + String falconVersion = BuildProperties.get().getProperty("project.version"); + String dbVersion = getFalconDBVersion(); + if (dbVersion.compareTo(falconVersion) >= 0) { + System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'"); + return; + } + + createUpgradeDB(sqlFile, run, false); + upgradeFalconDBVersion(sqlFile, run, falconVersion); + + // any post upgrade tasks + if (run) { + System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'"); + } + } + + + private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception { + String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'"; + PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); + writer.println(); + writer.println(updateDBVersion); + writer.close(); + System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version); + if (run) { + Connection conn = createConnection(); + Statement st = null; + try { + conn.setAutoCommit(true); + st = conn.createStatement(); + st.executeUpdate(updateDBVersion); + st.close(); + } catch (Exception ex) { + throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeStatement(st); + conn.close(); + } + } + System.out.println("DONE"); + } + + private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'"; + + private String getFalconDBVersion() throws Exception { + String version; + System.out.println("Get Falcon DB version"); + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(GET_FALCON_DB_VERSION); + if (rs.next()) { + version = rs.getString(1); + } else { + throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table"); + } + } catch (Exception ex) { + throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + System.out.println("DONE"); + return version; + } + + + private Map<String, String> getJdbcConf() throws Exception { + Map<String, String> jdbcConf = new HashMap<String, String>(); + jdbcConf.put("driver", StartupProperties.get().getProperty(FalconJPAService.DRIVER)); + String url = StartupProperties.get().getProperty(FalconJPAService.URL); + jdbcConf.put("url", url); + jdbcConf.put("user", StartupProperties.get().getProperty(FalconJPAService.USERNAME)); + jdbcConf.put("password", StartupProperties.get().getProperty(FalconJPAService.PASSWORD)); + String dbType = url.substring("jdbc:".length()); + if (dbType.indexOf(":") <= 0) { + throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'"); + } + dbType = dbType.substring(0, dbType.indexOf(":")); + jdbcConf.put("dbtype", dbType); + return jdbcConf; + } + + private String[] createMappingToolArguments(String sqlFile) throws Exception { + Map<String, String> conf = getJdbcConf(); + List<String> args = new ArrayList<String>(); + args.add("-schemaAction"); + args.add("add"); + args.add("-p"); + args.add("persistence.xml#falcon-" + conf.get("dbtype")); + args.add("-connectionDriverName"); + args.add(conf.get("driver")); + args.add("-connectionURL"); + args.add(conf.get("url")); + args.add("-connectionUserName"); + args.add(conf.get("user")); + args.add("-connectionPassword"); + args.add(conf.get("password")); + if (sqlFile != null) { + args.add("-sqlFile"); + args.add(sqlFile); + } + args.add("-indexes"); + args.add("true"); + args.add("org.apache.falcon.state.store.jdbc.EntityBean"); + args.add("org.apache.falcon.state.store.jdbc.InstanceBean"); + return args.toArray(new String[args.size()]); + } + + private void createDB(String sqlFile, boolean run) throws Exception { + validateConnection(); + if (checkDBExists()) { + return; + } + + verifyFalconPropsTable(false); + createUpgradeDB(sqlFile, run, true); + createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version")); + if (run) { + System.out.println("Falcon DB has been created for Falcon version '" + + BuildProperties.get().getProperty("project.version") + "'"); + } + } + + private static final String CREATE_FALCON_DB_PROPS = + "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))"; + + private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception { + String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')"; + + PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true)); + writer.println(); + writer.println(CREATE_FALCON_DB_PROPS); + writer.println(insertDbVerion); + writer.close(); + System.out.println("Create FALCON_DB_PROPS table"); + if (run) { + Connection conn = createConnection(); + Statement st = null; + try { + conn.setAutoCommit(true); + st = conn.createStatement(); + st.executeUpdate(CREATE_FALCON_DB_PROPS); + st.executeUpdate(insertDbVerion); + st.close(); + } catch (Exception ex) { + closeStatement(st); + throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + conn.close(); + } + } + System.out.println("DONE"); + } + + private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS"; + + private boolean verifyFalconPropsTable(boolean exists) throws Exception { + System.out.println((exists) ? "Check FALCON_DB_PROPS table exists" + : "Checking FALCON_DB_PROPS table does not exist"); + boolean tableExists; + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(FALCON_DB_PROPS_EXISTS); + rs.next(); + tableExists = true; + } catch (Exception ex) { + tableExists = false; + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + if (tableExists != exists) { + throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists")); + } + System.out.println("DONE"); + return tableExists; + } + + private void closeResultSet(ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (Exception e) { + System.out.println("Unable to close ResultSet " + rs); + } + } + + private void closeStatement(Statement st) throws Exception { + try { + if (st != null) { + st.close(); + } + } catch (Exception e) { + System.out.println("Unable to close SQL Statement " + st); + throw new Exception(e); + } + } + + private Connection createConnection() throws Exception { + Map<String, String> conf = getJdbcConf(); + Class.forName(conf.get("driver")).newInstance(); + return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password")); + } + + private void validateConnection() throws Exception { + System.out.println("Validating DB Connection"); + try { + createConnection().close(); + System.out.println("DONE"); + } catch (Exception ex) { + throw new Exception("Could not connect to the database: " + ex.toString(), ex); + } + } + + private static final String ENTITY_STATUS_QUERY = + "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')"; + private static final String INSTANCE_STATUS_QUERY = + "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')"; + + private boolean checkDBExists() throws Exception { + boolean schemaExists; + Connection conn = createConnection(); + ResultSet rs = null; + Statement st = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(ENTITY_STATUS_QUERY); + rs.next(); + schemaExists = true; + } catch (Exception ex) { + schemaExists = false; + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist")); + return schemaExists; + } + + private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception { + System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema"); + String[] args = createMappingToolArguments(sqlFile); + org.apache.openjpa.jdbc.meta.MappingTool.main(args); + if (run) { + args = createMappingToolArguments(null); + org.apache.openjpa.jdbc.meta.MappingTool.main(args); + } + System.out.println("DONE"); + } + + private void showVersion() throws Exception { + System.out.println("Falcon Server version: " + + BuildProperties.get().getProperty("project.version")); + validateConnection(); + if (!checkDBExists()) { + throw new Exception("Falcon DB doesn't exist"); + } + try { + verifyFalconPropsTable(true); + } catch (Exception ex) { + throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool"); + } + showFalconPropsInfo(); + } + + private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name"; + + private void showFalconPropsInfo() throws Exception { + Connection conn = createConnection(); + Statement st = null; + ResultSet rs = null; + try { + System.out.println("Falcon DB Version Information"); + System.out.println("--------------------------------------"); + st = conn.createStatement(); + rs = st.executeQuery(GET_FALCON_PROPS_INFO); + while (rs.next()) { + System.out.println(rs.getString(1) + ": " + rs.getString(2)); + } + System.out.println("--------------------------------------"); + } catch (Exception ex) { + throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex); + } finally { + closeResultSet(rs); + closeStatement(st); + conn.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml new file mode 100644 index 0000000..86558de --- /dev/null +++ b/scheduler/src/main/resources/META-INF/persistence.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<persistence xmlns="http://java.sun.com/xml/ns/persistence" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + version="1.0"> + + <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL"> + <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider> + + <class>org.apache.falcon.state.store.jdbc.EntityBean</class> + <class>org.apache.falcon.state.store.jdbc.InstanceBean</class> + + <properties> + <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/> + + <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time --> + + <property name="openjpa.MetaDataFactory" + value="jpa(Types=org.apache.falcon.state.store.EntityBean; + org.apache.falcon.state.store.InstanceBean)"></property> + + <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/> + <property name="openjpa.LockManager" value="pessimistic"/> + <property name="openjpa.ReadLockLevel" value="read"/> + <property name="openjpa.WriteLockLevel" value="write"/> + <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM--> + <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/> + <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/> + <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/> + <property name="openjpa.Log" value="log4j"/> + </properties> + </persistence-unit> + +</persistence> http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/falcon-buildinfo.properties ---------------------------------------------------------------------- diff --git a/scheduler/src/main/resources/falcon-buildinfo.properties b/scheduler/src/main/resources/falcon-buildinfo.properties new file mode 100644 index 0000000..5a7cb82 --- /dev/null +++ b/scheduler/src/main/resources/falcon-buildinfo.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###################### +*.domain=all + +*.build.user=${user.name} +*.build.epoch=${timestamp} +*.project.version=${pom.version} +*.build.version=${pom.version}-r${buildNumber} +*.vc.revision=${buildNumber} +*.vc.source.url=${scm.connection} +###################### http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java index bff92c9..2a9fbce 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java @@ -19,7 +19,6 @@ package org.apache.falcon.execution; import org.apache.falcon.FalconException; import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; @@ -36,6 +35,7 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService; import org.apache.falcon.notification.service.impl.JobCompletionService; import org.apache.falcon.notification.service.impl.SchedulerService; import org.apache.falcon.service.Services; +import org.apache.falcon.state.AbstractSchedulerTestBase; import org.apache.falcon.state.EntityClusterID; import org.apache.falcon.state.EntityID; import org.apache.falcon.state.EntityState; @@ -43,7 +43,8 @@ import org.apache.falcon.state.ID; import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; -import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.state.store.service.FalconJPAService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; @@ -59,6 +60,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; @@ -68,32 +70,38 @@ import java.util.Iterator; /** * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s. */ -public class FalconExecutionServiceTest extends AbstractTestBase { +public class FalconExecutionServiceTest extends AbstractSchedulerTestBase { - private InMemoryStateStore stateStore = null; + private StateStore stateStore = null; private AlarmService mockTimeService; private DataAvailabilityService mockDataService; private SchedulerService mockSchedulerService; private JobCompletionService mockCompletionService; private DAGEngine dagEngine; private int instanceCount = 0; + private static FalconJPAService falconJPAService = FalconJPAService.get(); @BeforeClass public void init() throws Exception { this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); this.conf = dfsCluster.getConf(); setupServices(); + super.setup(); + createDB(DB_SQL_FILE); + falconJPAService.init(); setupConfigStore(); } @AfterClass - public void tearDown() { + public void tearDown() throws FalconException, IOException { + super.cleanup(); this.dfsCluster.shutdown(); + falconJPAService.destroy(); } // State store is set up to sync with Config Store. That gets tested too. public void setupConfigStore() throws Exception { - stateStore = (InMemoryStateStore) AbstractStateStore.get(); + stateStore = AbstractStateStore.get(); getStore().registerListener(stateStore); storeEntity(EntityType.CLUSTER, "testCluster"); storeEntity(EntityType.FEED, "clicksFeed"); @@ -160,6 +168,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(event); // Ensure the instance is ready for execution + instance = stateStore.getExecutionInstance(new InstanceID(instance.getInstance())); Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY); // Simulate a scheduled notification @@ -211,12 +220,15 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(event); // One in ready and one in waiting. Both should be suspended. + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING); FalconExecutionService.get().suspend(process); + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED); Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), @@ -225,7 +237,11 @@ public class FalconExecutionServiceTest extends AbstractTestBase { instance2.getInstance().getId()); Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID); + Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), executorID); + FalconExecutionService.get().resume(process); + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING); @@ -237,17 +253,22 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(event); // One in running and the other in ready. Both should be suspended + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); FalconExecutionService.get().suspend(process); + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED); FalconExecutionService.get().resume(process); - + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); @@ -255,6 +276,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance()); FalconExecutionService.get().onEvent(event); + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED); } @@ -294,6 +316,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(event); // One in ready, one in waiting and one running. + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); + instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING); @@ -329,6 +354,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(dataEvent); + instanceState = stateStore.getExecutionInstance(new InstanceID(instanceState.getInstance())); Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT); } @@ -390,6 +416,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase { FalconExecutionService.get().onEvent(event); // One in ready, one in waiting and one running. + instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance())); + instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance())); + instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance())); Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING); Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY); Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING); @@ -442,7 +471,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase { EntityID processID = new EntityID(process); // Store couple of instances in store - stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED); + EntityState entityState = stateStore.getEntity(processID); + entityState.setCurrentState(EntityState.STATE.SCHEDULED); + stateStore.updateEntity(entityState); ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process, new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName); InstanceState instanceState1 = new InstanceState(instance1); @@ -459,11 +490,13 @@ public class FalconExecutionServiceTest extends AbstractTestBase { // Simulate a scheduled notification. This should cause the reload from state store Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance()); FalconExecutionService.get().onEvent(event); + instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance())); Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING); // Simulate a Job completion notification and ensure the instance resumes from where it left event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance()); FalconExecutionService.get().onEvent(event); + instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance())); Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED); } @@ -500,6 +533,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase { Assert.fail("Exception expected."); } catch (Exception e) { // One instance must fail and the other not + instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance())); + instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance())); Assert.assertEquals(instanceState2.getCurrentState(), state); Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING); } @@ -508,6 +543,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase { ((MockDAGEngine)dagEngine).removeFailInstance(instance1); m.invoke(FalconExecutionService.get(), process); + instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance())); + instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance())); // Both instances must be in expected state. Assert.assertEquals(instanceState2.getCurrentState(), state); Assert.assertEquals(instanceState1.getCurrentState(), state); http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java index 001f466..c43ccf0 100644 --- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java @@ -40,7 +40,7 @@ import org.apache.falcon.state.ID; import org.apache.falcon.state.InstanceID; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; -import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.state.store.StateStore; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.DAGEngine; import org.apache.falcon.workflow.engine.DAGEngineFactory; @@ -63,10 +63,10 @@ import static org.apache.falcon.state.InstanceState.STATE; */ public class SchedulerServiceTest extends AbstractTestBase { - private SchedulerService scheduler = Mockito.spy(new SchedulerService()); + private SchedulerService scheduler; private NotificationHandler handler; private static String cluster = "testCluster"; - private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get(); + private static StateStore stateStore; private static DAGEngine mockDagEngine; private static Process process; private volatile boolean failed = false; @@ -79,6 +79,10 @@ public class SchedulerServiceTest extends AbstractTestBase { @BeforeClass public void init() throws Exception { + StartupProperties.get().setProperty("falcon.state.store.impl", + "org.apache.falcon.state.store.InMemoryStateStore"); + stateStore = AbstractStateStore.get(); + scheduler = Mockito.spy(new SchedulerService()); this.dfsCluster = EmbeddedCluster.newCluster(cluster); this.conf = dfsCluster.getConf(); setupConfigStore(); @@ -97,6 +101,7 @@ public class SchedulerServiceTest extends AbstractTestBase { scheduler.init(); StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName()); mockDagEngine = DAGEngineFactory.getDAGEngine("testCluster"); + } @AfterClass @@ -199,7 +204,7 @@ public class SchedulerServiceTest extends AbstractTestBase { WorkflowJob.Status.SUCCEEDED, DateTime.now())); // Dependency now satisfied. Now, the first instance should get scheduled after retry delay. Thread.sleep(100); - Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1)); + Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1)); } @Test http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java new file mode 100644 index 0000000..48c1426 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state; + +import org.apache.falcon.entity.AbstractTestBase; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.tools.FalconStateStoreDBCLI; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; + +import java.io.File; +import java.io.IOException; + +/** + * TestBase for tests in scheduler. + */ +public class AbstractSchedulerTestBase extends AbstractTestBase { + private static final String DB_BASE_DIR = "target/test-data/falcondb"; + protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db"; + protected static String url = "jdbc:derby:"+ dbLocation +";create=true"; + protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql"; + protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql"; + protected LocalFileSystem fs = new LocalFileSystem(); + + public void setup() throws Exception { + StartupProperties.get(); + StartupProperties.get().setProperty(FalconJPAService.URL, url); + Configuration localConf = new Configuration(); + fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf); + fs.mkdirs(new Path(DB_BASE_DIR)); + } + + public void cleanup() throws IOException { + cleanupDB(); + } + + private void cleanupDB() throws IOException { + fs.delete(new Path(DB_BASE_DIR), true); + } + + protected void createDB(String file) { + File sqlFile = new File(file); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + } + + protected int execDBCLICommands(String[] args) { + return new FalconStateStoreDBCLI().run(args); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java index 2f32b43..6676754 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java @@ -18,55 +18,72 @@ package org.apache.falcon.state; import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.exception.InvalidStateTransitionException; +import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.state.store.AbstractStateStore; -import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.util.StartupProperties; import org.mockito.Mockito; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** * Tests to ensure entity state changes happen correctly. */ -public class EntityStateServiceTest { +public class EntityStateServiceTest extends AbstractSchedulerTestBase{ private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class); - @BeforeMethod - public void setUp() { - ((InMemoryStateStore) AbstractStateStore.get()).clear(); + @BeforeClass + public void setup() throws Exception { + StartupProperties.get().setProperty("falcon.state.store.impl", + "org.apache.falcon.state.store.InMemoryStateStore"); + super.setup(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + } + + @AfterMethod + public void setUp() throws StateStoreException { + AbstractStateStore.get().clear(); } // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume @Test - public void testLifeCycle() throws FalconException { + public void testLifeCycle() throws Exception { Process mockEntity = new Process(); mockEntity.setName("test"); - + storeEntity(EntityType.PROCESS, "test"); StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener); EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next(); Mockito.verify(listener).onSubmit(mockEntity); Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED)); StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener); Mockito.verify(listener).onSchedule(mockEntity); + entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next(); Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED)); StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener); Mockito.verify(listener).onSuspend(mockEntity); + entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next(); Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED)); StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener); Mockito.verify(listener).onResume(mockEntity); + entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next(); Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED)); } @Test - public void testInvalidTransitions() throws FalconException { + public void testInvalidTransitions() throws Exception { Feed mockEntity = new Feed(); mockEntity.setName("test"); + storeEntity(EntityType.FEED, "test"); StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener); // Attempt suspending a submitted entity try { @@ -99,10 +116,10 @@ public class EntityStateServiceTest { @Test(dataProvider = "state_and_events") public void testIdempotency(EntityState.STATE state, EntityState.EVENT event) - throws InvalidStateTransitionException { + throws Exception { Process mockEntity = new Process(); mockEntity.setName("test"); - + storeEntity(EntityType.PROCESS, "test"); EntityState entityState = new EntityState(mockEntity).setCurrentState(state); entityState.nextTransition(event); Assert.assertEquals(entityState.getCurrentState(), state); http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java index 43c3c54..f0ae7b2 100644 --- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java +++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java @@ -23,11 +23,12 @@ import org.apache.falcon.exception.InvalidStateTransitionException; import org.apache.falcon.exception.StateStoreException; import org.apache.falcon.execution.ProcessExecutionInstance; import org.apache.falcon.state.store.AbstractStateStore; -import org.apache.falcon.state.store.InMemoryStateStore; +import org.apache.falcon.util.StartupProperties; import org.joda.time.DateTime; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -40,6 +41,12 @@ public class InstanceStateServiceTest { private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class); private ProcessExecutionInstance mockInstance; + @BeforeClass + public void init() { + StartupProperties.get().setProperty("falcon.state.store.impl", + "org.apache.falcon.state.store.InMemoryStateStore"); + } + @BeforeMethod public void setup() { Process testProcess = new Process(); @@ -47,13 +54,14 @@ public class InstanceStateServiceTest { // Setup new mocks so we can verify the no. of invocations mockInstance = Mockito.mock(ProcessExecutionInstance.class); Mockito.when(mockInstance.getEntity()).thenReturn(testProcess); + Mockito.when(mockInstance.getCreationTime()).thenReturn(DateTime.now()); Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now()); Mockito.when(mockInstance.getCluster()).thenReturn("testCluster"); } @AfterMethod - public void tearDown() { - ((InMemoryStateStore) AbstractStateStore.get()).clear(); + public void tearDown() throws StateStoreException { + AbstractStateStore.get().clear(); } // Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running @@ -67,18 +75,28 @@ public class InstanceStateServiceTest { Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener); Mockito.verify(listener).onConditionsMet(mockInstance); + instanceFromStore = AbstractStateStore.get() + .getExecutionInstance(new InstanceID(mockInstance)); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener); Mockito.verify(listener).onSchedule(mockInstance); + instanceFromStore = AbstractStateStore.get() + .getExecutionInstance(new InstanceID(mockInstance)); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener); Mockito.verify(listener).onSuspend(mockInstance); + instanceFromStore = AbstractStateStore.get() + .getExecutionInstance(new InstanceID(mockInstance)); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING, listener); Mockito.verify(listener).onResume(mockInstance); + instanceFromStore = AbstractStateStore.get() + .getExecutionInstance(new InstanceID(mockInstance)); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING)); StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener); Mockito.verify(listener).onSuccess(mockInstance); + instanceFromStore = AbstractStateStore.get() + .getExecutionInstance(new InstanceID(mockInstance)); Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED)); Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0); } http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java new file mode 100644 index 0000000..ecd5293 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.service; + +import org.apache.falcon.FalconException; +import org.apache.falcon.state.AbstractSchedulerTestBase; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.persistence.EntityManager; +import java.io.IOException; +import java.util.Map; + +/** + * Test cases for FalconJPAService. + */ +public class TestFalconJPAService extends AbstractSchedulerTestBase { + + private static FalconJPAService falconJPAService = FalconJPAService.get(); + + @BeforeClass + public void setUp() throws Exception { + super.setup(); + createDB(DB_SQL_FILE); + } + + @Test + public void testService() throws FalconException { + // initialize it + falconJPAService.init(); + EntityManager entityManager = falconJPAService.getEntityManager(); + Map<String, Object> props = entityManager.getProperties(); + Assert.assertNotNull(props); + entityManager.close(); + } + + @AfterClass + public void tearDown() throws FalconException, IOException { + falconJPAService.destroy(); + super.cleanup(); + } + + + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java new file mode 100644 index 0000000..6d5bd49 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.state.service.store; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.exception.StateStoreException; +import org.apache.falcon.execution.ExecutionInstance; +import org.apache.falcon.execution.FalconExecutionService; +import org.apache.falcon.execution.MockDAGEngine; +import org.apache.falcon.execution.NotificationHandler; +import org.apache.falcon.notification.service.impl.AlarmService; +import org.apache.falcon.notification.service.impl.DataAvailabilityService; +import org.apache.falcon.notification.service.impl.JobCompletionService; +import org.apache.falcon.notification.service.impl.SchedulerService; +import org.apache.falcon.predicate.Predicate; +import org.apache.falcon.service.Services; +import org.apache.falcon.state.AbstractSchedulerTestBase; +import org.apache.falcon.state.EntityClusterID; +import org.apache.falcon.state.EntityID; +import org.apache.falcon.state.EntityState; +import org.apache.falcon.state.ID; +import org.apache.falcon.state.InstanceID; +import org.apache.falcon.state.InstanceState; +import org.apache.falcon.state.store.jdbc.BeanMapperUtil; +import org.apache.falcon.state.store.jdbc.JDBCStateStore; +import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.state.store.service.FalconJPAService; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.engine.DAGEngine; +import org.apache.falcon.workflow.engine.DAGEngineFactory; +import org.apache.falcon.workflow.engine.OozieDAGEngine; +import org.joda.time.DateTime; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +/** + * Test cases for JDBCStateStore. + */ +public class TestJDBCStateStore extends AbstractSchedulerTestBase { + private static StateStore stateStore = JDBCStateStore.get(); + private static Random randomValGenerator = new Random(); + private static FalconJPAService falconJPAService = FalconJPAService.get(); + private AlarmService mockTimeService; + private DataAvailabilityService mockDataService; + private SchedulerService mockSchedulerService; + private JobCompletionService mockCompletionService; + private DAGEngine dagEngine; + + @BeforeClass + public void setup() throws Exception { + super.setup(); + createDB(DB_SQL_FILE); + falconJPAService.init(); + this.dfsCluster = EmbeddedCluster.newCluster("testCluster"); + this.conf = dfsCluster.getConf(); + registerServices(); + } + + private void registerServices() throws FalconException { + mockTimeService = Mockito.mock(AlarmService.class); + Mockito.when(mockTimeService.getName()).thenReturn("AlarmService"); + Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + mockDataService = Mockito.mock(DataAvailabilityService.class); + Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService"); + Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + dagEngine = Mockito.mock(OozieDAGEngine.class); + Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class)); + mockSchedulerService = Mockito.mock(SchedulerService.class); + Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService"); + StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName()); + StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName()); + dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster")); + Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + mockCompletionService = Mockito.mock(JobCompletionService.class); + Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService"); + Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class), + Mockito.any(ID.class))).thenCallRealMethod(); + Services.get().register(mockTimeService); + Services.get().register(mockDataService); + Services.get().register(mockSchedulerService); + Services.get().register(mockCompletionService); + } + + + @Test + public void testInsertRetrieveAndUpdate() throws Exception { + EntityState entityState = getEntityState(EntityType.PROCESS, "process"); + stateStore.putEntity(entityState); + EntityID entityID = new EntityID(entityState.getEntity()); + EntityState actualEntityState = stateStore.getEntity(entityID); + Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity()); + Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState()); + try { + stateStore.putEntity(entityState); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + //no op + } + + entityState.setCurrentState(EntityState.STATE.SCHEDULED); + stateStore.updateEntity(entityState); + actualEntityState = stateStore.getEntity(entityID); + Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity()); + Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState()); + + stateStore.deleteEntity(entityID); + boolean entityExists = stateStore.entityExists(entityID); + Assert.assertEquals(entityExists, false); + + try { + stateStore.getEntity(entityID); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e){ + // no op + } + + try { + stateStore.updateEntity(entityState); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + // no op + } + + try { + stateStore.deleteEntity(entityID); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e){ + // no op + } + } + + + @Test + public void testGetEntities() throws Exception { + EntityState entityState1 = getEntityState(EntityType.PROCESS, "process1"); + EntityState entityState2 = getEntityState(EntityType.PROCESS, "process2"); + EntityState entityState3 = getEntityState(EntityType.FEED, "feed1"); + + Collection<EntityState> result = stateStore.getAllEntities(); + Assert.assertEquals(result.size(), 0); + + stateStore.putEntity(entityState1); + stateStore.putEntity(entityState2); + stateStore.putEntity(entityState3); + + result = stateStore.getAllEntities(); + Assert.assertEquals(result.size(), 3); + + Collection<Entity> entities = stateStore.getEntities(EntityState.STATE.SUBMITTED); + Assert.assertEquals(entities.size(), 3); + } + + + @Test + public void testInstanceInsertionAndUpdate() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + EntityState entityState = getEntityState(EntityType.PROCESS, "process"); + ExecutionInstance executionInstance = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis(), "cluster", System.currentTimeMillis()); + InstanceState instanceState = new InstanceState(executionInstance); + initInstanceState(instanceState); + stateStore.putExecutionInstance(instanceState); + InstanceID instanceID = new InstanceID(instanceState.getInstance()); + InstanceState actualInstanceState = stateStore.getExecutionInstance(instanceID); + Assert.assertEquals(actualInstanceState, instanceState); + + instanceState.setCurrentState(InstanceState.STATE.RUNNING); + Predicate predicate = new Predicate(Predicate.TYPE.DATA); + instanceState.getInstance().getAwaitingPredicates().add(predicate); + + stateStore.updateExecutionInstance(instanceState); + actualInstanceState = stateStore.getExecutionInstance(instanceID); + Assert.assertEquals(actualInstanceState, instanceState); + + try { + stateStore.putExecutionInstance(instanceState); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + // no op + } + + stateStore.deleteExecutionInstance(instanceID); + + try { + stateStore.getExecutionInstance(instanceID); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + // no op + } + + try { + stateStore.deleteExecutionInstance(instanceID); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + // no op + } + + try { + stateStore.updateExecutionInstance(instanceState); + Assert.fail("Exception must have been thrown"); + } catch (StateStoreException e) { + // no op + } + } + + + @Test + public void testBulkInstanceOperations() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + EntityState entityState = getEntityState(EntityType.PROCESS, "process1"); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis() - 60000, "cluster1", System.currentTimeMillis() - 60000); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.READY); + + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis(), "cluster1", System.currentTimeMillis()); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.RUNNING); + + ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + System.currentTimeMillis(), "cluster2", System.currentTimeMillis()); + InstanceState instanceState3 = new InstanceState(processExecutionInstance3); + instanceState3.setCurrentState(InstanceState.STATE.READY); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + stateStore.putExecutionInstance(instanceState3); + + Collection<InstanceState> actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), + "cluster1"); + Assert.assertEquals(actualInstances.size(), 2); + Assert.assertEquals(actualInstances.toArray()[0], instanceState1); + Assert.assertEquals(actualInstances.toArray()[1], instanceState2); + + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), + "cluster2"); + Assert.assertEquals(actualInstances.size(), 1); + Assert.assertEquals(actualInstances.toArray()[0], instanceState3); + + List<InstanceState.STATE> states = new ArrayList<>(); + states.add(InstanceState.STATE.READY); + + actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states); + Assert.assertEquals(actualInstances.size(), 1); + Assert.assertEquals(actualInstances.toArray()[0], instanceState1); + + EntityClusterID entityClusterID = new EntityClusterID(entityState.getEntity(), "testCluster"); + actualInstances = stateStore.getExecutionInstances(entityClusterID, states); + Assert.assertEquals(actualInstances.size(), 2); + Assert.assertEquals(actualInstances.toArray()[0], instanceState1); + Assert.assertEquals(actualInstances.toArray()[1], instanceState3); + + states.add(InstanceState.STATE.RUNNING); + actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states); + Assert.assertEquals(actualInstances.size(), 2); + Assert.assertEquals(actualInstances.toArray()[0], instanceState1); + Assert.assertEquals(actualInstances.toArray()[1], instanceState2); + + InstanceState lastInstanceState = stateStore.getLastExecutionInstance(entityState.getEntity(), "cluster1"); + Assert.assertEquals(lastInstanceState, instanceState2); + + + InstanceID instanceKey = new InstanceID(instanceState3.getInstance()); + stateStore.deleteExecutionInstance(instanceKey); + + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2"); + Assert.assertEquals(actualInstances.size(), 0); + + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1"); + Assert.assertEquals(actualInstances.size(), 2); + + stateStore.putExecutionInstance(instanceState3); + + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2"); + Assert.assertEquals(actualInstances.size(), 1); + + stateStore.deleteExecutionInstances(entityClusterID.getEntityID()); + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1"); + Assert.assertEquals(actualInstances.size(), 0); + + actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2"); + Assert.assertEquals(actualInstances.size(), 0); + + } + + + @Test + public void testGetExecutionInstancesWithRange() throws Exception { + storeEntity(EntityType.CLUSTER, "testCluster"); + storeEntity(EntityType.FEED, "clicksFeed"); + storeEntity(EntityType.FEED, "clicksSummary"); + + long instance1Time = System.currentTimeMillis() - 180000; + long instance2Time = System.currentTimeMillis(); + EntityState entityState = getEntityState(EntityType.PROCESS, "process1"); + ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance1Time, "cluster1", instance1Time); + InstanceState instanceState1 = new InstanceState(processExecutionInstance1); + instanceState1.setCurrentState(InstanceState.STATE.RUNNING); + + ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance( + entityState.getEntity().getEntityType(), entityState.getEntity(), + instance2Time, "cluster1", instance2Time); + InstanceState instanceState2 = new InstanceState(processExecutionInstance2); + instanceState2.setCurrentState(InstanceState.STATE.RUNNING); + + stateStore.putExecutionInstance(instanceState1); + stateStore.putExecutionInstance(instanceState2); + + List<InstanceState.STATE> states = new ArrayList<>(); + states.add(InstanceState.STATE.RUNNING); + + Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), + "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time + 60000)); + Assert.assertEquals(1, actualInstances.size()); + Assert.assertEquals(instanceState1, actualInstances.toArray()[0]); + + actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), + "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000)); + Assert.assertEquals(1, actualInstances.size()); + Assert.assertEquals(instanceState2, actualInstances.toArray()[0]); + + } + + + private void initInstanceState(InstanceState instanceState) { + instanceState.setCurrentState(InstanceState.STATE.READY); + instanceState.getInstance().setExternalID(RandomStringUtils.randomNumeric(6)); + instanceState.getInstance().setInstanceSequence(randomValGenerator.nextInt()); + instanceState.getInstance().setActualStart(new DateTime(System.currentTimeMillis())); + instanceState.getInstance().setActualEnd(new DateTime(System.currentTimeMillis())); + List<Predicate> predicates = new ArrayList<>(); + Predicate predicate = new Predicate(Predicate.TYPE.JOB_COMPLETION); + predicates.add(predicate); + instanceState.getInstance().setAwaitingPredicates(predicates); + } + + private EntityState getEntityState(EntityType entityType, String name) throws Exception { + storeEntity(entityType, name); + Entity entity = getStore().get(entityType, name); + Assert.assertNotNull(entity); + return new EntityState(entity); + } + + @AfterTest + public void cleanUpTables() throws StateStoreException { + stateStore.deleteEntities(); + stateStore.deleteExecutionInstances(); + } + + @AfterClass + public void cleanup() throws IOException { + super.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java new file mode 100644 index 0000000..8a42830 --- /dev/null +++ b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.tools; + + +import org.apache.falcon.state.AbstractSchedulerTestBase; +import org.apache.falcon.util.BuildProperties; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; + +/** + * Tests for DB operations tool. + */ +public class TestFalconStateStoreDBCLI extends AbstractSchedulerTestBase { + + @BeforeClass + public void setup() throws Exception { + super.setup(); + } + + @AfterClass + public void cleanup() throws IOException { + super.cleanup(); + } + + + @Test + public void testFalconDBCLI() throws Exception { + File sqlFile = new File(DB_SQL_FILE); + String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" }; + int result = execDBCLICommands(argsCreate); + Assert.assertEquals(0, result); + Assert.assertTrue(sqlFile.exists()); + + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintStream oldOut = System.out; + try { + // show versions + System.setOut(new PrintStream(data)); + String[] argsVersion = { "version" }; + Assert.assertEquals(0, execDBCLICommands(argsVersion)); + Assert.assertTrue(data.toString().contains("db.version: " + + BuildProperties.get().getProperty("project.version"))); + // show help information + data.reset(); + String[] argsHelp = { "help" }; + Assert.assertEquals(0, execDBCLICommands(argsHelp)); + Assert.assertTrue(data.toString().contains("falcondb create <OPTIONS> : Create Falcon DB schema")); + Assert.assertTrue(data.toString().contains("falcondb upgrade <OPTIONS> : Upgrade Falcon DB schema")); + // try run invalid command + data.reset(); + String[] argsInvalidCommand = { "invalidCommand" }; + Assert.assertEquals(1, execDBCLICommands(argsInvalidCommand)); + } finally { + System.setOut(oldOut); + } + // generate an upgrade script + File update = new File(DB_UPDATE_SQL_FILE); + + String[] argsUpgrade = { "upgrade", "-sqlfile", update.getAbsolutePath(), "-run" }; + BuildProperties.get().setProperty("project.version", "99999-SNAPSHOT"); + Assert.assertEquals(0, execDBCLICommands(argsUpgrade)); + + Assert.assertTrue(update.exists()); + } + +}
