Updated Branches: refs/heads/trunk ad12695b5 -> 03fa9c530
http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/aop/build/aop.xml ---------------------------------------------------------------------- diff --git a/src/test/aop/build/aop.xml b/src/test/aop/build/aop.xml new file mode 100644 index 0000000..77320aa --- /dev/null +++ b/src/test/aop/build/aop.xml @@ -0,0 +1,154 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project name="aspects"> + <!-- Properties common for all fault injections --> + <property name="build-fi.dir" location="${basedir}/build-fi"/> + <property name="sqoop-fi.jar" location="${build.dir}/${final.name}-fi.jar"/> + <property name="compile-inject.output" + value="${build-fi.dir}/compile-fi.log"/> + <property name="aspectversion" value="1.6.11"/> + <property name="javac.version" value="1.6"/> + <property file="${basedir}/build.properties"/> + + <!--All Fault Injection (FI) related targets are located in this session --> + + <target name="clean-fi"> + <delete dir="${build-fi.dir}"/> + </target> + + <!-- Weaving aspects in place + Later on one can run 'ant jar-fault-inject' to create + Hadoop jar file with instrumented classes--> + + <target name="compile-fault-inject" depends="compile, compile-test"> + <!-- AspectJ task definition --> + <taskdef + resource="org/aspectj/tools/ant/taskdefs/aspectjTaskdefs.properties"> + <classpath> + <pathelement + location="${build.ivy.lib.dir}/sqoop/hadoop${hadoopversion}test/aspectjtools-${aspectversion}.jar"/> + </classpath> + </taskdef> + <echo message="Start weaving aspects in place"/> + <iajc + encoding="${build.encoding}" + srcdir="${src.dir.path}" + includes="**/org/apache/sqoop/**/*.java, **/org/apache/sqoop/**/*.aj" + destDir="${build-fi.dir}/classes" + debug="${javac.debug}" + target="${javac.version}" + source="${javac.version}" + deprecation="${javac.deprecation}" + fork="true" + maxmem="1024m" + > + + <classpath> + <path refid="test.classpath"/> + <fileset dir="${build-fi.dir}/test"> + <include name="**/*.jar" /> + <exclude name="**/excluded/" /> + </fileset> + </classpath> + </iajc> + <loadfile property="injection.failure" srcfile="${compile-inject.output}"> + <filterchain> + <linecontainsregexp> + <regexp pattern='iajc.*warning'/> + </linecontainsregexp> + </filterchain> + </loadfile> + <fail if="injection.failure"> + Broken binding of advises: ${line.separator}${injection.failure} + </fail> + <echo message="Weaving of aspects is finished"/> + </target> + + <target name="fi-init"> + <mkdir dir="${build-fi.dir}"/> + </target> + + <target name="injectfaults" + description="Instrument classes with faults and other AOP advices"> + <mkdir dir="${build-fi.dir}"/> + <delete file="${compile-inject.output}"/> + <echo message="In injectfaults ${src.dir}"/> + <weave-injectfault-aspects dest.dir="${build-fi.dir}/classes" + src.dir="${base.src.dir}"> + </weave-injectfault-aspects> + + </target> + <macrodef name="weave-injectfault-aspects"> + <attribute name="dest.dir" /> + <attribute name="src.dir" /> + <sequential> + <subant buildpath="build.xml" target="compile-fault-inject" + output="${compile-inject.output}"> + <property name="build.dir" value="${build-fi.dir}" /> + <property name="src.dir.path" value="@{src.dir}" /> + <property name="dest.dir" value="@{dest.dir}" /> + </subant> + </sequential> + </macrodef> + <macrodef name="macro-run-tests-fault-inject"> + <attribute name="target.name" /> + <attribute name="testcasesonly" /> + <sequential> + <subant buildpath="build.xml" target="@{target.name}"> + <property name="build.dir" value="${build-fi.dir}"/> + <property name="test.fault.inject" value="yes"/> + <property name="test.include" value="TestFi*"/> + <property name="test.timeout" value="3000000"/> + <!-- This one is needed for the special "regression" target only --> + <property name="special.fi.testcasesonly" value="@{testcasesonly}"/> + </subant> + </sequential> + </macrodef> + + <!-- ================================================================== --> + <!-- Make sqoop-fi.jar including all Fault injected artifacts --> + <!-- ================================================================== --> + <macrodef name="macro-jar-fault-inject"> + <attribute name="target.name" /> + <attribute name="build.dir" /> + <attribute name="jar.final.name" /> + <attribute name="jar.final.value" /> + <sequential> + <subant buildpath="build.xml" target="@{target.name}"> + <property name="build.dir" value="@{build.dir}"/> + <property name="@{jar.final.name}" value="@{jar.final.value}"/> + <property name="jar.extra.properties.list" + value="${test.dir}/fi-site.xml" /> + </subant> + </sequential> + </macrodef> + <!-- ================================================================== --> + <!-- Make test jar files including all Fault Injected artifacts --> + <!-- ================================================================== --> + <macrodef name="macro-jar-test-fault-inject"> + <attribute name="target.name" /> + <attribute name="jar.final.name" /> + <attribute name="jar.final.value" /> + <sequential> + <echo message="@{jar.final.value}"/> + <subant buildpath="build.xml" target="@{target.name}"> + <property name="build.dir" value="${build-fi.dir}"/> + <property name="@{jar.final.name}" + value="@{jar.final.value}"/> + </subant> + </sequential> + </macrodef> + <!--End of Fault Injection (FI) related session--> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/aop/org/apache/sqoop/fi/FiConfig.java ---------------------------------------------------------------------- diff --git a/src/test/aop/org/apache/sqoop/fi/FiConfig.java b/src/test/aop/org/apache/sqoop/fi/FiConfig.java new file mode 100644 index 0000000..0194407 --- /dev/null +++ b/src/test/aop/org/apache/sqoop/fi/FiConfig.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.fi; + +import org.apache.hadoop.conf.Configuration; + + +/** + * This class wraps the logic around fault injection configuration file. + * Default file is expected to be found in src/test/fi-site.xml + * This default file should be copied by JUnit Ant's tasks to + * build/test/extraconf folder before tests are ran + * An alternative location can be set through + * -Dfi.config=<file_name> + */ +public class FiConfig { + private static final String CONFIG_PARAMETER = ProbabilityModel.FPROB_NAME + + "config"; + private static final String DEFAULT_CONFIG = "fi-site.xml"; + private static Configuration conf; + static { + if (conf == null) { + conf = new Configuration(false); + String configName = System.getProperty(CONFIG_PARAMETER, DEFAULT_CONFIG); + conf.addResource(configName); + } + } + + /** + * Method provides access to local Configuration + * + * @return Configuration initialized with fault injection's parameters + */ + public static Configuration getConfig() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/aop/org/apache/sqoop/fi/ProbabilityModel.java ---------------------------------------------------------------------- diff --git a/src/test/aop/org/apache/sqoop/fi/ProbabilityModel.java b/src/test/aop/org/apache/sqoop/fi/ProbabilityModel.java new file mode 100644 index 0000000..5de90ed --- /dev/null +++ b/src/test/aop/org/apache/sqoop/fi/ProbabilityModel.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.fi; + +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * This class is responsible for the decision of when a fault + * has to be triggered within a class of Hadoop + * + * Default probability of injection is set to 0%. To change it + * one can set the sys. prop. -Dfi.*=<new probability level> + * Another way to do so is to set this level through FI config file, + * located under src/test/fi-site.conf + * + * To change the level one has to specify the following sys,prop.: + * -Dfi.<name of fault location>=<probability level> in the runtime + * Probability level is specified by a float between 0.0 and 1.0 + * + * <name of fault location> might be represented by a short classname + * or otherwise. This decision is left up to the discretion of aspects + * developer, but has to be consistent through the code + */ +public class ProbabilityModel { + private static Random generator = new Random(); + private static final Log LOG = LogFactory.getLog(ProbabilityModel.class); + + static final String FPROB_NAME = "fi."; + private static final String ALL_PROBABILITIES = FPROB_NAME + "*"; + private static final float DEFAULT_PROB = 0.00f; //Default probability is 0% + private static final float MAX_PROB = 1.00f; // Max probability is 100% + + private static Configuration conf = FiConfig.getConfig(); + + static { + // Set new default probability if specified through a system.property + // If neither is specified set default probability to DEFAULT_PROB + conf.set(ALL_PROBABILITIES, + System.getProperty(ALL_PROBABILITIES, + conf.get(ALL_PROBABILITIES, Float.toString(DEFAULT_PROB)))); + + LOG.info(ALL_PROBABILITIES + "=" + conf.get(ALL_PROBABILITIES)); + } + + /** + * Simplistic method to check if we have reached the point of injection. + * @param klassName is the name of the probability level to check. + * If a configuration has been set for "fi.myClass" then you can check if the + * inject criteria has been reached by calling this method with "myClass" + * string as its parameter + * @return true if the probability threshold has been reached; false otherwise + */ + public static boolean injectCriteria(String klassName) { + boolean trigger = false; + if (generator.nextFloat() < getProbability(klassName)) { + trigger = true; + } + return trigger; + } + + /** + * This primitive checks for arbitrary set of desired probability. If the + * level hasn't been set method will return default setting. + * The probability expected to be set as an float between 0.0 and 1.0 + * @param klass is the name of the resource + * @return float representation of configured probability level of + * the requested resource or default value if hasn't been set + */ + protected static float getProbability(final String klass) { + String newProbName = FPROB_NAME + klass; + + String newValue = System.getProperty(newProbName, + conf.get(ALL_PROBABILITIES)); + if (newValue != null && !newValue.equals(conf.get(newProbName))) + conf.set(newProbName, newValue); + + float ret = conf.getFloat(newProbName, + conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB)); + LOG.debug("Request for " + newProbName + " returns=" + ret); + // Make sure that probability level is valid. + if (ret < DEFAULT_PROB || ret > MAX_PROB) { + LOG.info("Probability level is incorrect. Default value is set"); + ret = conf.getFloat(ALL_PROBABILITIES, DEFAULT_PROB); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/aop/org/apache/sqoop/mapreduce/SqlServerExportAspect.aj ---------------------------------------------------------------------- diff --git a/src/test/aop/org/apache/sqoop/mapreduce/SqlServerExportAspect.aj b/src/test/aop/org/apache/sqoop/mapreduce/SqlServerExportAspect.aj new file mode 100644 index 0000000..18481c3 --- /dev/null +++ b/src/test/aop/org/apache/sqoop/mapreduce/SqlServerExportAspect.aj @@ -0,0 +1,100 @@ +package org.apache.sqoop.mapreduce; + +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.List; +import org.apache.sqoop.fi.ProbabilityModel; +import org.apache.sqoop.lib.SqoopRecord; +import java.util.Random; +/** +* This aspect injects faults into the SQLServerExportDBExecThread +* which handles executing the write batch to the sql server database +*/ +public privileged aspect SqlServerExportAspect { + + // Before adding to these, note that every sqlstate must have a + // corresponding exceptionMsg and vice versa!!!!!! + private static String[] exceptionMsg = { "Connection reset", + "deadlocked on thread", + "Connection initialization error", + "Connection link failure" + }; + + private static String[] sqlStates = { "08S01", // covers SQL error states + // 40143 40197, 40501, 40613, + // 10054, 10053,64 + "40001", // SQL Error 1205, + // deadlock victim + "01000", // SQL Error 233, + // connection init failure + "08001", // 10060 connection link + // init failure + }; + + private static boolean allFaults = false; + + // export pointcut and advice + pointcut ExportExecuteStatementPointcut(SQLServerExportDBExecThread thread, + PreparedStatement stmt, + List<SqoopRecord> records): + execution (protected void SQLServerAsyncDBExecThread.executeStatement( + PreparedStatement, List<SqoopRecord>)) + && target(thread) && args(stmt, records); + + void around(SQLServerExportDBExecThread thread,PreparedStatement stmt, + List<SqoopRecord> records) throws SQLException: + ExportExecuteStatementPointcut(thread, stmt, records) { + + + Random random = new Random(); + + int exceptionToThrow = 0; + if (allFaults) + { + exceptionToThrow = random.nextInt(sqlStates.length); + } + thread.LOG.info("exception to be thrown is " + exceptionToThrow); + + // start the method like normal, execute the batch + Connection conn = thread.getConnection(); + try { + // throw a SQL exception before/during the execute + if (ProbabilityModel.injectCriteria("SQLServerExportDBExecThread")) { + thread.LOG.info("throwing " + exceptionMsg[exceptionToThrow] + + "exception after execute and before commit"); + conn.close(); + throw new SQLException(exceptionMsg[exceptionToThrow], + sqlStates[exceptionToThrow]); + } + stmt.executeBatch(); + } catch (SQLException execSqlEx) { + thread.LOG.warn("Error executing statement: " + execSqlEx); + //conn.rollback(); + if (thread.failedCommit && + thread.canIgnoreForFailedCommit(execSqlEx.getSQLState())){ + thread.LOG.info("Ignoring error after failed commit"); + } else { + throw execSqlEx; + } + } + + // If the batch of records is executed successfully, then commit before + // processing the next batch of records + try { + if (ProbabilityModel.injectCriteria("SQLServerExportDBExecThread")) { + thread.LOG.info("throwing " + exceptionMsg[exceptionToThrow] + + "exception during commit"); + conn.close(); + throw new SQLException(exceptionMsg[exceptionToThrow], + sqlStates[exceptionToThrow]); + } + conn.commit(); + thread.failedCommit = false; + } catch (SQLException commitSqlEx) { + thread.LOG.warn("Error while committing transactions: " + commitSqlEx); + thread.failedCommit = true; + throw commitSqlEx; + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/aop/org/apache/sqoop/mapreduce/db/SqlServerImportAspect.aj ---------------------------------------------------------------------- diff --git a/src/test/aop/org/apache/sqoop/mapreduce/db/SqlServerImportAspect.aj b/src/test/aop/org/apache/sqoop/mapreduce/db/SqlServerImportAspect.aj new file mode 100644 index 0000000..518d218 --- /dev/null +++ b/src/test/aop/org/apache/sqoop/mapreduce/db/SqlServerImportAspect.aj @@ -0,0 +1,97 @@ +package org.apache.sqoop.mapreduce.db; + +import java.sql.SQLException; +import org.apache.sqoop.fi.ProbabilityModel; +import java.sql.ResultSet; +import java.io.IOException; +import java.sql.Connection; +import java.lang.Math; +import java.util.Random; +/** +* This aspect forces sql connection exceptions and long backoff times +* class +*/ +public privileged aspect SqlServerImportAspect { + + // Before adding to these, note that every sqlstate must have a + // corresponding exceptionMsg and vice versa!!!!!! + private static String[] exceptionMsg = { "Connection reset", + "deadlocked on thread", + "Connection initialization error", + "Connection link failure" + }; + + private static String[] sqlStates = { "08S01", // covers SQL error states + // 40143 40197, 40501, 40613, + // 10054, 10053,64 + "40001", // SQL Error 1205, deadlock victim + "01000", //SQL Error 233, connection + // init failure + "08001", //10060 connection link/ + // init failure + }; + + private static final boolean allFaults = false; + + // import pointcut, throw a SQL Exception as if the connection was reset + // during a database read + pointcut ImportQueryPointcut(): + execution (protected ResultSet DBRecordReader.executeQuery(String)) + && target(DBRecordReader); + + after() returning throws SQLException : ImportQueryPointcut() { + Random random = new Random(); + int exceptionToThrow = 0; + if (allFaults) + { + exceptionToThrow = random.nextInt(sqlStates.length); + } + DBRecordReader.LOG.info("exception to be thrown is " + exceptionToThrow); + DBRecordReader.LOG.info("Hitting import execute query pointcut," + + " return a SQL Exception after reading rows"); + if (ProbabilityModel.injectCriteria("DBRecordReader")) { + DBRecordReader.LOG.info("throwing " + exceptionMsg[exceptionToThrow] + + "exception after reading"); + throw new SQLException(exceptionMsg[exceptionToThrow], + sqlStates[exceptionToThrow]); + } + } + + // connection reset pointcut. Make the backoff wait time the maximum time + pointcut ConnectionResetPointcut(SQLServerConnectionFailureHandler handler): + execution (public Connection BasicRetrySQLFailureHandler.recover()) + && target(handler); + + before (SQLServerConnectionFailureHandler handler) + throws IOException : ConnectionResetPointcut(handler) { + handler.LOG.info("Hitting connection reset pointcut. " + + "waiting max time of " + handler.DEFAULT_RETRY_WAIT_MAX + + " and interval default is " + handler.DEFAULT_RETRY_WAIT_INTERVAL); + + // calculate the max number of retries by solving for numRetries + // in the retry logic + // where timeToWait = retryNum^2 * DEFAULT_RETRY_WAIT_INTERVAL + // so therefore since we want to know the number of retries it + // takes to get the DEFAULT_RETRY_WAIT_MAX we solve for retryNum + long maxNumRetries = (long)Math.ceil(Math.sqrt + ((double)handler.DEFAULT_RETRY_WAIT_MAX + /handler.DEFAULT_RETRY_WAIT_INTERVAL)); + + long maxTimeWait = 0; + for (double i = 0; i <= maxNumRetries; i++) + { + maxTimeWait += (long)(Math.pow(i, 2) + * (double)handler.DEFAULT_RETRY_WAIT_INTERVAL); + } + handler.LOG.info("Maximum retries possible is " + maxNumRetries + + " and maximum time to wait is " + maxTimeWait); + if (ProbabilityModel.injectCriteria("SQLServerConnectionFailureHandler")) { + try { + handler.LOG.info("sleeping waiting for a connection for max time"); + Thread.sleep(maxTimeWait); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/03fa9c53/src/test/fi-site.xml ---------------------------------------------------------------------- diff --git a/src/test/fi-site.xml b/src/test/fi-site.xml new file mode 100644 index 0000000..1e101b1 --- /dev/null +++ b/src/test/fi-site.xml @@ -0,0 +1,31 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- Put fault injection specific property overrides in this file. --> + +<configuration> + <property> + <name>fi.*</name> + <value>0.10</value> + <description> + Default probability level for all injected faults specified + as a floating number between 0 and 1.00 + </description> + </property> +</configuration>
