Updated Branches: refs/heads/trunk 0f0066f52 -> 0b465594d
SQOOP-638: Add an optional, simple and extensible validation framework for sqoop (Venkatesh Seetharam via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0b465594 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0b465594 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0b465594 Branch: refs/heads/trunk Commit: 0b465594d24827c5a8d28e81ed3487e82937a72b Parents: 0f0066f Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Dec 1 13:00:47 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Dec 1 13:00:47 2012 -0800 ---------------------------------------------------------------------- src/docs/user/SqoopUserGuide.txt | 3 +- src/docs/user/common-args.txt | 1 - src/docs/user/export.txt | 9 + src/docs/user/import.txt | 10 + src/docs/user/validation-args.txt | 32 ++++ src/docs/user/validation.txt | 136 +++++++++++++++ src/java/com/cloudera/sqoop/mapreduce/JobBase.java | 33 ++++ src/java/org/apache/sqoop/SqoopOptions.java | 52 +++++- .../org/apache/sqoop/mapreduce/ExportJobBase.java | 26 +++ .../org/apache/sqoop/mapreduce/ImportJobBase.java | 26 +++ src/java/org/apache/sqoop/tool/BaseSqoopTool.java | 65 +++++++- src/java/org/apache/sqoop/tool/ExportTool.java | 3 + src/java/org/apache/sqoop/tool/ImportTool.java | 19 ++ .../validation/AbsoluteValidationThreshold.java | 50 ++++++ .../sqoop/validation/LogOnFailureHandler.java | 41 +++++ .../apache/sqoop/validation/RowCountValidator.java | 64 +++++++ .../apache/sqoop/validation/ValidationContext.java | 61 +++++++ .../sqoop/validation/ValidationException.java | 36 ++++ .../sqoop/validation/ValidationFailureHandler.java | 36 ++++ .../sqoop/validation/ValidationThreshold.java | 30 ++++ .../org/apache/sqoop/validation/Validator.java | 55 ++++++ .../validation/RowCountValidatorImportTest.java | 90 ++++++++++ 22 files changed, 871 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/SqoopUserGuide.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/SqoopUserGuide.txt b/src/docs/user/SqoopUserGuide.txt index e74cf64..01ac1cf 100644 --- a/src/docs/user/SqoopUserGuide.txt +++ b/src/docs/user/SqoopUserGuide.txt @@ -54,6 +54,8 @@ include::import-all-tables.txt[] include::export.txt[] +include::validation.txt[] + include::saved-jobs.txt[] include::codegen.txt[] @@ -77,4 +79,3 @@ include::connectors.txt[] include::support.txt[] include::troubleshooting.txt[] - http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/common-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/common-args.txt b/src/docs/user/common-args.txt index eff9939..7c69886 100644 --- a/src/docs/user/common-args.txt +++ b/src/docs/user/common-args.txt @@ -36,4 +36,3 @@ Argument Description +\--connection-param-file <filename>+ Optional properties file that\ provides connection parameters ------------------------------------------------------------------------------- - http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/export.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt index ebd0ff0..9f600fe 100644 --- a/src/docs/user/export.txt +++ b/src/docs/user/export.txt @@ -42,6 +42,8 @@ another. include::common-args.txt[] +include::validation-args.txt[] + .Export control arguments: [grid="all"] `----------------------------------------`------------------------------ @@ -266,3 +268,10 @@ Sqoop attempts to insert rows which violate constraints in the database fails. +Another basic export to populate a table named +bar+ with validation enabled: +<<validation,More Details>> + +---- +$ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar \ + --export-dir /results/bar_data --validate +---- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/import.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index e37d1ab..82e74dd 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -48,6 +48,8 @@ include::common-args.txt[] include::connecting.txt[] +include::validation-args.txt[] + .Import control arguments: [grid="all"] `---------------------------------`-------------------------------------- @@ -677,4 +679,12 @@ $ sqoop import --connect jdbc:mysql://db.foo.com/somedb --table sometable \ --where "id > 100000" --target-dir /incremental_dataset --append ---- +An import of a table named +EMPLOYEES+ in the +corp+ database that uses +validation to validate the import using the table row count and number of +rows copied into HDFS: +<<validation,More Details>> +---- +$ sqoop import --connect jdbc:mysql://db.foo.com/corp \ + --table EMPLOYEES --validate +---- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/validation-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/validation-args.txt b/src/docs/user/validation-args.txt new file mode 100644 index 0000000..3cb5f66 --- /dev/null +++ b/src/docs/user/validation-args.txt @@ -0,0 +1,32 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + +.Validation arguments <<validation,More Details>> +[grid="all"] +`----------------------------------------`------------------------------------- +Argument Description +------------------------------------------------------------------------------- ++\--validate+ Enable validation of data copied, \ + supports single table copy only. \ ++\--validator <class-name>+ Specify validator class to use. ++\--validation-threshold <class-name>+ Specify validation threshold class \ + to use. ++\--validation-failurehandler <class-name>+ Specify validation failure \ + handler class to use. +------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/docs/user/validation.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/validation.txt b/src/docs/user/validation.txt new file mode 100644 index 0000000..f28b420 --- /dev/null +++ b/src/docs/user/validation.txt @@ -0,0 +1,136 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +[[validation]] ++validation+ +-------------- + + +Purpose +~~~~~~~ + +Validate the data copied, either import or export by comparing the row +counts from the source and the target post copy. + + +Introduction +~~~~~~~~~~~~ + +There are 3 basic interfaces: +ValidationThreshold - Determines if the error margin between the source and +target are acceptable: Absolute, Percentage Tolerant, etc. +Default implementation is AbsoluteValidationThreshold which ensures the row +counts from source and targets are the same. + +ValidationFailureHandler - Responsible for handling failures: log an +error/warning, abort, etc. +Default implementation is LogOnFailureHandler that logs a warning message to +the configured logger. + +Validator - Drives the validation logic by delegating the decision to +ValidationThreshold and delegating failure handling to ValidationFailureHandler. +The default implementation is RowCountValidator which validates the row +counts from source and the target. + + +Syntax +~~~~~~ + +---- +$ sqoop import (generic-args) (import-args) +$ sqoop export (generic-args) (export-args) +---- + +Validation arguments are part of import and export arguments. + + +Configuration +~~~~~~~~~~~~~ + +The validation framework is extensible and pluggable. It comes with default +implementations but the interfaces can be extended to allow custom +implementations by passing them as part of the command line arguments as +described below. + + +.Validator + Property: validator + Description: Driver for validation, + must implement org.apache.sqoop.validation.Validator + Supported values: The value has to be a fully qualified class name. + Default value: org.apache.sqoop.validation.RowCountValidator + +.Validation Threshold + Property: validation-threshold + Description: Drives the decision based on the validation meeting the + threshold or not. Must implement + org.apache.sqoop.validation.ValidationThreshold + Supported values: The value has to be a fully qualified class name. + Default value: org.apache.sqoop.validation.AbsoluteValidationThreshold + +.Validation Failure Handler + Property: validation-failurehandler + Description: Responsible for handling failures, must implement + org.apache.sqoop.validation.ValidationFailureHandler + Supported values: The value has to be a fully qualified class name. + Default value: org.apache.sqoop.validation.LogOnFailureHandler + + +Limitations +~~~~~~~~~~~ + +Validation currently only validates data copied from a single table into HDFS. +The following are the limitations in the current implementation: + +* all-tables option +* free-form query option +* Data imported into Hive or HBase +* table import with --where argument +* incremental imports + + +Example Invocations +~~~~~~~~~~~~~~~~~~~ + +A basic import of a table named +EMPLOYEES+ in the +corp+ database that uses +validation to validate the row counts: + +---- +$ sqoop import --connect jdbc:mysql://db.foo.com/corp \ + --table EMPLOYEES --validate +---- + +A basic export to populate a table named +bar+ with validation enabled: + +---- +$ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar \ + --export-dir /results/bar_data --validate +---- + +Another example that overrides the validation args: + +---- +$ sqoop import --connect jdbc:mysql://db.foo.com/corp --table EMPLOYEES \ + --validate --validator org.apache.sqoop.validation.RowCountValidator \ + --validation-threshold \ + org.apache.sqoop.validation.AbsoluteValidationThreshold \ + --validation-failurehandler \ + org.apache.sqoop.validation.LogOnFailureHandler +---- http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/com/cloudera/sqoop/mapreduce/JobBase.java ---------------------------------------------------------------------- diff --git a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java index 0c75091..b89612d 100644 --- a/src/java/com/cloudera/sqoop/mapreduce/JobBase.java +++ b/src/java/com/cloudera/sqoop/mapreduce/JobBase.java @@ -18,10 +18,19 @@ package com.cloudera.sqoop.mapreduce; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import com.cloudera.sqoop.SqoopOptions; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.config.ConfigurationHelper; +import org.apache.sqoop.manager.ConnManager; +import org.apache.sqoop.validation.*; + +import java.io.IOException; +import java.sql.SQLException; /** * @deprecated Moving to use org.apache.sqoop namespace. @@ -44,4 +53,28 @@ public class JobBase super(opts, mapperClass, inputFormatClass, outputFormatClass); } + protected long getRowCountFromDB(ConnManager connManager, String tableName) + throws SQLException { + return connManager.getTableRowCount(tableName); + } + + protected long getRowCountFromHadoop(Job job) + throws IOException, InterruptedException { + return ConfigurationHelper.getNumMapOutputRecords(job); + } + + protected void doValidate(SqoopOptions options, Configuration conf, + ValidationContext validationContext) + throws ValidationException { + Validator validator = (Validator) ReflectionUtils.newInstance( + options.getValidatorClass(), conf); + ValidationThreshold threshold = (ValidationThreshold) + ReflectionUtils.newInstance(options.getValidationThresholdClass(), + conf); + ValidationFailureHandler failureHandler = (ValidationFailureHandler) + ReflectionUtils.newInstance(options.getValidationFailureHandlerClass(), + conf); + + validator.validate(validationContext, threshold, failureHandler); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 2b5db29..613f797 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -39,6 +39,9 @@ import com.cloudera.sqoop.tool.SqoopTool; import com.cloudera.sqoop.util.RandomHash; import com.cloudera.sqoop.util.StoredAsProperty; import org.apache.sqoop.util.LoggingUtils; +import org.apache.sqoop.validation.AbsoluteValidationThreshold; +import org.apache.sqoop.validation.LogOnFailureHandler; +import org.apache.sqoop.validation.RowCountValidator; /** * Configurable state used by Sqoop tools. @@ -248,6 +251,13 @@ public class SqoopOptions implements Cloneable { // (JobBase, etc). private SqoopTool activeSqoopTool; + // Flag to determine if data copied needs to be validated against the source + private boolean isValidationEnabled; + // These take FQCN as input, convert them to Class in light of failing early + private Class validatorClass; // Class for the validator implementation. + private Class validationThresholdClass; // ValidationThreshold implementation + private Class validationFailureHandlerClass; // FailureHandler implementation + public SqoopOptions() { initDefaults(null); } @@ -819,6 +829,10 @@ public class SqoopOptions implements Cloneable { // We do not want to be verbose too much if not explicitly needed this.verbose = false; + this.isValidationEnabled = false; // validation is disabled by default + this.validatorClass = RowCountValidator.class; + this.validationThresholdClass = AbsoluteValidationThreshold.class; + this.validationFailureHandlerClass = LogOnFailureHandler.class; } /** @@ -1899,9 +1913,7 @@ public class SqoopOptions implements Cloneable { this.mergeKeyCol = col; } - /** - * Return the name of the column used to merge an old and new dataset. - */ + /** Return the name of the column used to merge an old and new dataset. */ public String getMergeKeyCol() { return this.mergeKeyCol; } @@ -1963,5 +1975,37 @@ public class SqoopOptions implements Cloneable { public Properties getConnectionParams() { return connectionParams; } -} + public void setValidationEnabled(boolean validationEnabled) { + isValidationEnabled = validationEnabled; + } + + public boolean isValidationEnabled() { + return isValidationEnabled; + } + + public Class getValidatorClass() { + return validatorClass; + } + + public void setValidatorClass(Class validatorClazz) { + this.validatorClass = validatorClazz; + } + + public Class getValidationThresholdClass() { + return validationThresholdClass; + } + + public void setValidationThresholdClass(Class validationThresholdClazz) { + this.validationThresholdClass = validationThresholdClazz; + } + + public Class getValidationFailureHandlerClass() { + return validationFailureHandlerClass; + } + + public void setValidationFailureHandlerClass( + Class validationFailureHandlerClazz) { + this.validationFailureHandlerClass = validationFailureHandlerClazz; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java index d311ec1..5261671 100644 --- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java @@ -21,6 +21,7 @@ package org.apache.sqoop.mapreduce; import java.io.FileNotFoundException; import java.io.IOException; import java.sql.SQLException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +45,7 @@ import com.cloudera.sqoop.manager.ExportJobContext; import com.cloudera.sqoop.orm.TableClassName; import com.cloudera.sqoop.mapreduce.JobBase; import com.cloudera.sqoop.util.ExportException; +import org.apache.sqoop.validation.*; /** * Base class for running an export MapReduce job. @@ -374,6 +376,10 @@ public class ExportJobBase extends JobBase { if (!success) { throw new ExportException("Export job failed!"); } + + if (options.isValidationEnabled()) { + validateExport(tableName, conf, job); + } } catch (InterruptedException ie) { throw new IOException(ie); } catch (ClassNotFoundException cnfe) { @@ -399,6 +405,26 @@ public class ExportJobBase extends JobBase { } } + protected void validateExport(String tableName, Configuration conf, Job job) + throws ExportException { + LOG.debug("Validating exported data."); + try { + ValidationContext validationContext = new ValidationContext( + getRowCountFromHadoop(job), + getRowCountFromDB(context.getConnManager(), tableName)); + + doValidate(options, conf, validationContext); + } catch (ValidationException e) { + throw new ExportException("Error validating row counts", e); + } catch (SQLException e) { + throw new ExportException("Error retrieving DB target row count", e); + } catch (IOException e) { + throw new ExportException("Error retrieving source row count", e); + } catch (InterruptedException e) { + throw new ExportException("Error retrieving source row count", e); + } + } + /** * @return true if the input directory contains SequenceFiles. * @deprecated use {@link #getInputFileType()} instead http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 057de84..f6e2e72 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -19,6 +19,7 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; +import java.sql.SQLException; import org.apache.avro.file.DataFileConstants; import org.apache.avro.mapred.AvroJob; @@ -44,6 +45,7 @@ import com.cloudera.sqoop.manager.ImportJobContext; import com.cloudera.sqoop.mapreduce.JobBase; import com.cloudera.sqoop.orm.TableClassName; import com.cloudera.sqoop.util.ImportException; +import org.apache.sqoop.validation.*; /** * Base class for running an import MapReduce job. @@ -212,6 +214,10 @@ public class ImportJobBase extends JobBase { if (!success) { throw new ImportException("Import job failed!"); } + + if (options.isValidationEnabled()) { + validateImport(tableName, conf, job); + } } catch (InterruptedException ie) { throw new IOException(ie); } catch (ClassNotFoundException cnfe) { @@ -222,6 +228,26 @@ public class ImportJobBase extends JobBase { } } + protected void validateImport(String tableName, Configuration conf, Job job) + throws ImportException { + LOG.debug("Validating imported data."); + try { + ValidationContext validationContext = new ValidationContext( + getRowCountFromDB(context.getConnManager(), tableName), // source + getRowCountFromHadoop(job)); // target + + doValidate(options, conf, validationContext); + } catch (ValidationException e) { + throw new ImportException("Error validating row counts", e); + } catch (SQLException e) { + throw new ImportException("Error retrieving DB source row count", e); + } catch (IOException e) { + throw new ImportException("Error retrieving target row count", e); + } catch (InterruptedException e) { + throw new ImportException("Error retrieving target row count", e); + } + } + /** * Open-ended "setup" routine that is called after the job is configured * but just before it is submitted to MapReduce. Subclasses may override http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 1ab3e5f..d795646 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -142,6 +142,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String UPDATE_KEY_ARG = "update-key"; public static final String UPDATE_MODE_ARG = "update-mode"; + // Arguments for validation. + public static final String VALIDATE_ARG = "validate"; + public static final String VALIDATOR_CLASS_ARG = "validator"; + public static final String VALIDATION_THRESHOLD_CLASS_ARG = + "validation-threshold"; + public static final String VALIDATION_FAILURE_HANDLER_CLASS_ARG = + "validation-failurehandler"; + // Arguments for incremental imports. public static final String INCREMENT_TYPE_ARG = "incremental"; public static final String INCREMENT_COL_ARG = "check-column"; @@ -619,7 +627,29 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { return hbaseOpts; } - + @SuppressWarnings("static-access") + protected void addValidationOpts(RelatedOptions validationOptions) { + validationOptions.addOption(OptionBuilder + .withDescription("Validate the copy using the configured validator") + .withLongOpt(VALIDATE_ARG) + .create()); + validationOptions.addOption(OptionBuilder + .withArgName(VALIDATOR_CLASS_ARG).hasArg() + .withDescription("Fully qualified class name for the Validator") + .withLongOpt(VALIDATOR_CLASS_ARG) + .create()); + validationOptions.addOption(OptionBuilder + .withArgName(VALIDATION_THRESHOLD_CLASS_ARG).hasArg() + .withDescription("Fully qualified class name for ValidationThreshold") + .withLongOpt(VALIDATION_THRESHOLD_CLASS_ARG) + .create()); + validationOptions.addOption(OptionBuilder + .withArgName(VALIDATION_FAILURE_HANDLER_CLASS_ARG).hasArg() + .withDescription("Fully qualified class name for " + + "ValidationFailureHandler") + .withLongOpt(VALIDATION_FAILURE_HANDLER_CLASS_ARG) + .create()); + } /** * Apply common command-line to the state. @@ -885,6 +915,39 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { } } + protected void applyValidationOptions(CommandLine in, SqoopOptions out) + throws InvalidOptionsException { + if (in.hasOption(VALIDATE_ARG)) { + out.setValidationEnabled(true); + } + + // Class Names are converted to Class in light of failing early + if (in.hasOption(VALIDATOR_CLASS_ARG)) { + out.setValidatorClass( + getClassByName(in.getOptionValue(VALIDATOR_CLASS_ARG))); + } + + if (in.hasOption(VALIDATION_THRESHOLD_CLASS_ARG)) { + out.setValidationThresholdClass( + getClassByName(in.getOptionValue(VALIDATION_THRESHOLD_CLASS_ARG))); + } + + if (in.hasOption(VALIDATION_FAILURE_HANDLER_CLASS_ARG)) { + out.setValidationFailureHandlerClass(getClassByName( + in.getOptionValue(VALIDATION_FAILURE_HANDLER_CLASS_ARG))); + } + } + + protected Class<?> getClassByName(String className) + throws InvalidOptionsException { + try { + return Class.forName(className, true, + Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new InvalidOptionsException(e.getMessage()); + } + } + protected void validateCommonOptions(SqoopOptions options) throws InvalidOptionsException { if (options.getConnectString() == null) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/tool/ExportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ExportTool.java b/src/java/org/apache/sqoop/tool/ExportTool.java index 4965e21..acd296d 100644 --- a/src/java/org/apache/sqoop/tool/ExportTool.java +++ b/src/java/org/apache/sqoop/tool/ExportTool.java @@ -177,6 +177,8 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { .withLongOpt(UPDATE_MODE_ARG) .create()); + addValidationOpts(exportOpts); + return exportOpts; } @@ -271,6 +273,7 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { out.setClearStagingTable(true); } + applyValidationOptions(in, out); applyNewUpdateOptions(in, out); applyInputFormatOptions(in, out); applyOutputFormatOptions(in, out); http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/tool/ImportTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index 932e39b..10f0cb9 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -508,6 +508,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { * @return the RelatedOptions that can be used to parse the import * arguments. */ + @SuppressWarnings("static-access") protected RelatedOptions getImportOptions() { // Imports RelatedOptions importOpts = new RelatedOptions("Import control arguments"); @@ -554,6 +555,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { + " value of the primary key") .withLongOpt(SQL_QUERY_BOUNDARY) .create()); + + addValidationOpts(importOpts); } importOpts.addOption(OptionBuilder.withArgName("dir") @@ -756,6 +759,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { if (in.hasOption(SQL_QUERY_BOUNDARY)) { out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY)); } + + applyValidationOptions(in, out); } if (in.hasOption(WAREHOUSE_DIR_ARG)) { @@ -873,6 +878,20 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool { throw new InvalidOptionsException( "Direct import currently do not support dropping hive delimiters," + " please remove parameter --hive-drop-import-delims."); + } else if (allTables && options.isValidationEnabled()) { + throw new InvalidOptionsException("Validation is not supported for " + + "all tables but single table only."); + } else if (options.getSqlQuery() != null && options.isValidationEnabled()) { + throw new InvalidOptionsException("Validation is not supported for " + + "free from query but single table only."); + } else if (options.getWhereClause() != null + && options.isValidationEnabled()) { + throw new InvalidOptionsException("Validation is not supported for " + + "where clause but single table only."); + } else if (options.getIncrementalMode() + != SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) { + throw new InvalidOptionsException("Validation is not supported for " + + "incremental imports but single table only."); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/AbsoluteValidationThreshold.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/AbsoluteValidationThreshold.java b/src/java/org/apache/sqoop/validation/AbsoluteValidationThreshold.java new file mode 100644 index 0000000..c068ba8 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/AbsoluteValidationThreshold.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A specific implementation of ValidationThreshold that validates based on + * two values being the same. + * + * This is used as the default ValidationThreshold implementation unless + * overridden in configuration. + */ +public class AbsoluteValidationThreshold implements ValidationThreshold { + + private static final Log LOG = + LogFactory.getLog(AbsoluteValidationThreshold.class.getName()); + + @Override + public void setThresholdValue(long value) { + } + + static final ValidationThreshold INSTANCE = new AbsoluteValidationThreshold(); + + @Override + @SuppressWarnings("unchecked") + public boolean compare(Comparable left, Comparable right) { + LOG.debug("Absolute Validation threshold comparing " + + left + " with " + right); + + return (Math.abs(left.compareTo(right)) == 0); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/LogOnFailureHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/LogOnFailureHandler.java b/src/java/org/apache/sqoop/validation/LogOnFailureHandler.java new file mode 100644 index 0000000..3ded6ad --- /dev/null +++ b/src/java/org/apache/sqoop/validation/LogOnFailureHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A specific implementation of ValidationFailureHandler that logs the failure + * message and the reason with the configured logger. + * + * This is used as the default handler unless overridden in configuration. + */ +public class LogOnFailureHandler implements ValidationFailureHandler { + private static final Log LOG = + LogFactory.getLog(LogOnFailureHandler.class.getName()); + + static final ValidationFailureHandler INSTANCE = new LogOnFailureHandler(); + + @Override + public boolean handle(ValidationContext context) throws ValidationException { + LOG.warn(context.getMessage() + ", Reason: " + context.getReason()); + return true; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/RowCountValidator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/RowCountValidator.java b/src/java/org/apache/sqoop/validation/RowCountValidator.java new file mode 100644 index 0000000..2896192 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/RowCountValidator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A specific implementation of validator that validates data copied, + * either import or export, using row counts from the data source and + * the target systems. + * + * This is used as the default validator unless overridden in configuration. + */ +public class RowCountValidator implements Validator { + + public static final Log LOG = LogFactory.getLog( + RowCountValidator.class.getName()); + + @Override + public boolean validate(ValidationContext context) + throws ValidationException { + return validate(context, + AbsoluteValidationThreshold.INSTANCE, LogOnFailureHandler.INSTANCE); + } + + @Override + public boolean validate(ValidationContext validationContext, + ValidationThreshold validationThreshold, + ValidationFailureHandler validationFailureHandler) + throws ValidationException { + LOG.debug("Validating data using row counts: Source [" + + validationContext.getSourceRowCount() + "] with Target[" + + validationContext.getTargetRowCount() + "]"); + + if (validationThreshold.compare(validationContext.getSourceRowCount(), + validationContext.getTargetRowCount())) { + return true; + } + + validationContext.setMessage(this.getClass().getSimpleName()); + validationContext.setReason("The expected counter value was " + + validationContext.getSourceRowCount() + " but the actual value was " + + validationContext.getTargetRowCount()); + + return validationFailureHandler.handle(validationContext); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/ValidationContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/ValidationContext.java b/src/java/org/apache/sqoop/validation/ValidationContext.java new file mode 100644 index 0000000..4863b71 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/ValidationContext.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +/** + * This object encapsulates the context for the validation framework. + * Before validation, the row counts are stored. Post validation, + * the message and failure reason are captured. + */ +public class ValidationContext { + private final long sourceRowCount; + private final long targetRowCount; + + private String message; + private String reason; + + public ValidationContext(long sourceRowCount, long targetRowCount) { + this.sourceRowCount = sourceRowCount; + this.targetRowCount = targetRowCount; + } + + public String getMessage() { + return message; + } + + public void setMessage(String aMessage) { + this.message = aMessage; + } + + public String getReason() { + return reason; + } + + public void setReason(String aReason) { + this.reason = aReason; + } + + public long getSourceRowCount() { + return sourceRowCount; + } + + public long getTargetRowCount() { + return targetRowCount; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/ValidationException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/ValidationException.java b/src/java/org/apache/sqoop/validation/ValidationException.java new file mode 100644 index 0000000..0c10241 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/ValidationException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +/** + * An implementation of Exception that is used to propagate + * validation related errors or failures. + */ +public class ValidationException extends Exception { + + public ValidationException(String s, Throwable throwable) { + super(s, throwable); + } + + @Override + public String toString() { + String msg = getMessage(); + return (null == msg) ? "ValidationException" : msg; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/ValidationFailureHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/ValidationFailureHandler.java b/src/java/org/apache/sqoop/validation/ValidationFailureHandler.java new file mode 100644 index 0000000..89a4084 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/ValidationFailureHandler.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +/** + * This is the primary interface that dictates as to + * how the validation failures are handled. + */ +public interface ValidationFailureHandler { + + /** + * Method that handles the validation failure. + * + * @param validationContext validation context + * @return if failure was handled or not + * @throws ValidationException + */ + boolean handle(ValidationContext validationContext) + throws ValidationException; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/ValidationThreshold.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/ValidationThreshold.java b/src/java/org/apache/sqoop/validation/ValidationThreshold.java new file mode 100644 index 0000000..6013d62 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/ValidationThreshold.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +/** + * This is the primary interface that is responsible for driving the actual + * decision on validation based on an optional error margin threshold. + */ +public interface ValidationThreshold { + + void setThresholdValue(long value); + + boolean compare(Comparable left, Comparable right); +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/java/org/apache/sqoop/validation/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/validation/Validator.java b/src/java/org/apache/sqoop/validation/Validator.java new file mode 100644 index 0000000..1089a95 --- /dev/null +++ b/src/java/org/apache/sqoop/validation/Validator.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +/** + * This represents the primary interface that drives the validation logic + * by delegating the decision to ValidationThreshold and failure handling + * to ValidationFailureHandler. Uses ValidationContext to encapsulate + * the various required parameters. + */ +public interface Validator { + + /** + * Method to validate the data copy with default implementations + * for ValidationThreshold and ValidationFailureHandler. + * + * @param validationContext validation context + * @return if validation was successful or not + * @throws ValidationException + */ + boolean validate(ValidationContext validationContext) + throws ValidationException; + + /** + * Method to validate the data copy with specific implementations + * for ValidationThreshold and ValidationFailureHandler. + * + * @param validationContext validation context + * @param validationThreshold specific implementation of ValidationThreshold + * @param validationFailureHandler specific implementation of + * ValidationFailureHandler + * @return if validation was successful or not + * @throws ValidationException + */ + boolean validate(ValidationContext validationContext, + ValidationThreshold validationThreshold, + ValidationFailureHandler validationFailureHandler) + throws ValidationException; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/0b465594/src/test/org/apache/sqoop/validation/RowCountValidatorImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/validation/RowCountValidatorImportTest.java b/src/test/org/apache/sqoop/validation/RowCountValidatorImportTest.java new file mode 100644 index 0000000..a53e281 --- /dev/null +++ b/src/test/org/apache/sqoop/validation/RowCountValidatorImportTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.validation; + +import com.cloudera.sqoop.testutil.ImportJobTestCase; +import org.apache.hadoop.conf.Configuration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Tests for RowCountValidator. + */ +public class RowCountValidatorImportTest extends ImportJobTestCase { + + protected List<String> getExtraArgs(Configuration conf) { + ArrayList<String> list = new ArrayList<String>(1); + list.add("--validate"); + return list; + } + + /** + * Test the implementation for AbsoluteValidationThreshold. + * Both arguments should be same else fail. + */ + public void testAbsoluteValidationThreshold() { + ValidationThreshold validationThreshold = new AbsoluteValidationThreshold(); + assertTrue(validationThreshold.compare(100, 100)); + assertFalse(validationThreshold.compare(100, 90)); + assertFalse(validationThreshold.compare(90, 100)); + } + + /** + * Test if teh --validate flag actually made it through the options. + * + * @throws Exception + */ + public void testValidateOptionIsEnabled() throws Exception { + String[] types = {"INT NOT NULL PRIMARY KEY", "VARCHAR(32)", "VARCHAR(32)"}; + String[] insertVals = {"1", "'Bob'", "'sales'"}; + + try { + createTableWithColTypes(types, insertVals); + + String[] args = getArgv(true, null, getConf()); + ArrayList<String> argsList = new ArrayList<String>(); + Collections.addAll(argsList, args); + assertTrue("Validate option missing.", argsList.contains("--validate")); + } finally { + dropTableIfExists(getTableName()); + } + } + + /** + * Test the validation for a sample import, positive case. + * + * @throws Exception + */ + public void testValidatorForImportTable() throws Exception { + String[] types = {"INT NOT NULL PRIMARY KEY", "VARCHAR(32)", "VARCHAR(32)"}; + String[] insertVals = {"1", "'Bob'", "'sales'"}; + String validateLine = "1,Bob,sales"; + + try { + createTableWithColTypes(types, insertVals); + + verifyImport(validateLine, null); + LOG.debug("Verified input line as " + validateLine + " -- ok!"); + } finally { + dropTableIfExists(getTableName()); + } + } +}
