SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)
(Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cb821480 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cb821480 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cb821480 Branch: refs/heads/sqoop2 Commit: cb8214806b4e47dc3ad30d5bff0a42b04a412a06 Parents: 68577fb Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 14 17:03:12 2014 -0400 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 14 17:03:12 2014 -0400 ---------------------------------------------------------------------- .../apache/sqoop/job/etl/ExtractorContext.java | 1 - .../mapreduce/MapreduceExecutionEngine.java | 16 +- .../java/org/apache/sqoop/job/JobConstants.java | 81 ------ .../org/apache/sqoop/job/MRExecutionError.java | 97 +++++++ .../org/apache/sqoop/job/MRJobConstants.java | 81 ++++++ .../sqoop/job/MapreduceExecutionError.java | 97 ------- .../main/java/org/apache/sqoop/job/io/Data.java | 26 +- .../apache/sqoop/job/mr/ConfigurationUtils.java | 278 ------------------- .../sqoop/job/mr/MRConfigurationUtils.java | 278 +++++++++++++++++++ .../apache/sqoop/job/mr/ProgressRunnable.java | 45 --- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 16 +- .../sqoop/job/mr/SqoopFileOutputFormat.java | 8 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 18 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 26 +- .../sqoop/job/mr/SqoopNullOutputFormat.java | 2 +- .../job/mr/SqoopOutputFormatLoadExecutor.java | 28 +- .../sqoop/job/mr/SqoopProgressRunnable.java | 45 +++ .../org/apache/sqoop/job/mr/SqoopReducer.java | 4 +- .../org/apache/sqoop/job/mr/SqoopSplit.java | 6 +- .../org/apache/sqoop/job/TestMapReduce.java | 32 +-- .../java/org/apache/sqoop/job/TestMatching.java | 12 +- .../apache/sqoop/job/io/SqoopWritableTest.java | 2 +- .../sqoop/job/mr/TestConfigurationUtils.java | 168 ----------- .../sqoop/job/mr/TestMRConfigurationUtils.java | 168 +++++++++++ .../mr/TestSqoopOutputFormatLoadExecutor.java | 10 +- .../mapreduce/MapreduceSubmissionEngine.java | 22 +- 26 files changed, 783 insertions(+), 784 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java index 3272b56..4875ed0 100644 --- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java +++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java @@ -19,7 +19,6 @@ package org.apache.sqoop.job.etl; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.etl.io.DataWriter; -import org.apache.sqoop.schema.Schema; /** * Context implementation for Extractor. http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 47f8478..9b3eb44 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -21,7 +21,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.driver.ExecutionEngine; import org.apache.sqoop.driver.JobRequest; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; import org.apache.sqoop.job.io.SqoopWritable; @@ -64,16 +64,16 @@ public class MapreduceExecutionEngine extends ExecutionEngine { From from = (From) mrJobRequest.getFrom(); To to = (To) mrJobRequest.getTo(); MutableMapContext context = mrJobRequest.getDriverContext(); - context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); - context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); - context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName()); - context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); - context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); - context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT, + context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName()); + context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName()); + context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName()); + context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName()); + context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName()); + context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT, mrJobRequest.getIntermediateDataFormat().getName()); if(mrJobRequest.getExtractors() != null) { - context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); + context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors()); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java deleted file mode 100644 index 349bb60..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job; - -import org.apache.sqoop.core.ConfigurationConstants; -import org.apache.sqoop.driver.DriverConstants; - -public final class JobConstants extends Constants { - /** - * All job related configuration is prefixed with this: - * <tt>org.apache.sqoop.job.</tt> - */ - public static final String PREFIX_JOB_CONFIG = - ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job."; - - public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG - + "etl.partitioner"; - - public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG - + "etl.extractor"; - - public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG - + "etl.loader"; - - public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG - + "etl.from.destroyer"; - - public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG - + "etl.to.destroyer"; - - public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG - + "mr.output.file"; - - public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG - + "mr.output.codec"; - - - public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG - + "etl.extractor.count"; - - public static final String PREFIX_CONNECTOR_FROM_CONTEXT = - PREFIX_JOB_CONFIG + "connector.from.context."; - - public static final String PREFIX_CONNECTOR_TO_CONTEXT = - PREFIX_JOB_CONFIG + "connector.to.context."; - - // Hadoop specific constants - // We're using constants from Hadoop 1. Hadoop 2 has different names, but - // provides backward compatibility layer for those names as well. - - public static final String HADOOP_INPUTDIR = "mapred.input.dir"; - - public static final String HADOOP_OUTDIR = "mapred.output.dir"; - - public static final String HADOOP_COMPRESS = "mapred.output.compress"; - - public static final String HADOOP_COMPRESS_CODEC = - "mapred.output.compression.codec"; - - public static final String INTERMEDIATE_DATA_FORMAT = - DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; - - private JobConstants() { - // Disable explicit object creation - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java new file mode 100644 index 0000000..e70b7e2 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import org.apache.sqoop.common.ErrorCode; + +/** + * + */ +public enum MRExecutionError implements ErrorCode { + + MAPRED_EXEC_0000("Unknown error"), + + /** Error occurs during job execution. */ + MAPRED_EXEC_0008("Error occurs during job execution"), + + /** The system was unable to load the specified class. */ + MAPRED_EXEC_0009("Unable to load the specified class"), + + /** The system was unable to instantiate the specified class. */ + MAPRED_EXEC_0010("Unable to instantiate the specified class"), + + /** The parameter already exists in the context */ + MAPRED_EXEC_0011("The parameter already exists in the context"), + + /** The type is not supported */ + MAPRED_EXEC_0012("The type is not supported"), + + /** Cannot write to the data writer */ + MAPRED_EXEC_0013("Cannot write to the data writer"), + + /** Cannot read from the data reader */ + MAPRED_EXEC_0014("Cannot read to the data reader"), + + /** Unable to write data due to interrupt */ + MAPRED_EXEC_0015("Unable to write data due to interrupt"), + + /** Unable to read data due to interrupt */ + MAPRED_EXEC_0016("Unable to read data due to interrupt"), + + /** Error occurs during extractor run */ + MAPRED_EXEC_0017("Error occurs during extractor run"), + + /** Error occurs during loader run */ + MAPRED_EXEC_0018("Error occurs during loader run"), + + MAPRED_EXEC_0019("Data have not been completely consumed yet"), + + /** The required option has not been set yet */ + MAPRED_EXEC_0020("The required option has not been set yet"), + + /** Error occurs during partitioner run */ + MAPRED_EXEC_0021("Error occurs during partitioner run"), + + /** Unable to parse because it is not properly delimited */ + MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"), + + /** Unknown job type */ + MAPRED_EXEC_0023("Unknown job type"), + + /** Unsupported output format type found **/ + MAPRED_EXEC_0024("Unknown output format type"), + + /** Got invalid number of partitions from Partitioner */ + MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"), + + ; + + private final String message; + + private MRExecutionError(String message) { + this.message = message; + } + + public String getCode() { + return name(); + } + + public String getMessage() { + return message; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java new file mode 100644 index 0000000..67021a8 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job; + +import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.driver.DriverConstants; + +public final class MRJobConstants extends Constants { + /** + * All job related configuration is prefixed with this: + * <tt>org.apache.sqoop.job.</tt> + */ + public static final String PREFIX_JOB_CONFIG = + ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job."; + + public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG + + "etl.partitioner"; + + public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG + + "etl.extractor"; + + public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG + + "etl.loader"; + + public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG + + "etl.from.destroyer"; + + public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG + + "etl.to.destroyer"; + + public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG + + "mr.output.file"; + + public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG + + "mr.output.codec"; + + + public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG + + "etl.extractor.count"; + + public static final String PREFIX_CONNECTOR_FROM_CONTEXT = + PREFIX_JOB_CONFIG + "connector.from.context."; + + public static final String PREFIX_CONNECTOR_TO_CONTEXT = + PREFIX_JOB_CONFIG + "connector.to.context."; + + // Hadoop specific constants + // We're using constants from Hadoop 1. Hadoop 2 has different names, but + // provides backward compatibility layer for those names as well. + + public static final String HADOOP_INPUTDIR = "mapred.input.dir"; + + public static final String HADOOP_OUTDIR = "mapred.output.dir"; + + public static final String HADOOP_COMPRESS = "mapred.output.compress"; + + public static final String HADOOP_COMPRESS_CODEC = + "mapred.output.compression.codec"; + + public static final String INTERMEDIATE_DATA_FORMAT = + DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format"; + + private MRJobConstants() { + // Disable explicit object creation + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java deleted file mode 100644 index 1dc12d1..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job; - -import org.apache.sqoop.common.ErrorCode; - -/** - * - */ -public enum MapreduceExecutionError implements ErrorCode { - - MAPRED_EXEC_0000("Unknown error"), - - /** Error occurs during job execution. */ - MAPRED_EXEC_0008("Error occurs during job execution"), - - /** The system was unable to load the specified class. */ - MAPRED_EXEC_0009("Unable to load the specified class"), - - /** The system was unable to instantiate the specified class. */ - MAPRED_EXEC_0010("Unable to instantiate the specified class"), - - /** The parameter already exists in the context */ - MAPRED_EXEC_0011("The parameter already exists in the context"), - - /** The type is not supported */ - MAPRED_EXEC_0012("The type is not supported"), - - /** Cannot write to the data writer */ - MAPRED_EXEC_0013("Cannot write to the data writer"), - - /** Cannot read from the data reader */ - MAPRED_EXEC_0014("Cannot read to the data reader"), - - /** Unable to write data due to interrupt */ - MAPRED_EXEC_0015("Unable to write data due to interrupt"), - - /** Unable to read data due to interrupt */ - MAPRED_EXEC_0016("Unable to read data due to interrupt"), - - /** Error occurs during extractor run */ - MAPRED_EXEC_0017("Error occurs during extractor run"), - - /** Error occurs during loader run */ - MAPRED_EXEC_0018("Error occurs during loader run"), - - MAPRED_EXEC_0019("Data have not been completely consumed yet"), - - /** The required option has not been set yet */ - MAPRED_EXEC_0020("The required option has not been set yet"), - - /** Error occurs during partitioner run */ - MAPRED_EXEC_0021("Error occurs during partitioner run"), - - /** Unable to parse because it is not properly delimited */ - MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"), - - /** Unknown job type */ - MAPRED_EXEC_0023("Unknown job type"), - - /** Unsupported output format type found **/ - MAPRED_EXEC_0024("Unknown output format type"), - - /** Got invalid number of partitions from Partitioner */ - MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"), - - ; - - private final String message; - - private MapreduceExecutionError(String message) { - this.message = message; - } - - public String getCode() { - return name(); - } - - public String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java index 5423b7b..139883e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRExecutionError; public class Data implements WritableComparable<Data> { @@ -76,7 +76,7 @@ public class Data implements WritableComparable<Data> { this.content = content; break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -87,7 +87,7 @@ public class Data implements WritableComparable<Data> { case ARRAY_RECORD: return parse(); default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType)); } } @@ -141,7 +141,7 @@ public class Data implements WritableComparable<Data> { } return result; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -156,7 +156,7 @@ public class Data implements WritableComparable<Data> { readArray(in); break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -171,7 +171,7 @@ public class Data implements WritableComparable<Data> { writeArray(out); break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -249,7 +249,7 @@ public class Data implements WritableComparable<Data> { default: throw new IOException( - new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) + new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type)) ); } } @@ -307,7 +307,7 @@ public class Data implements WritableComparable<Data> { } else { throw new IOException( - new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, + new SqoopException(MRExecutionError.MAPRED_EXEC_0012, array[i].getClass().getName() ) ); @@ -351,7 +351,7 @@ public class Data implements WritableComparable<Data> { return sb.toString(); default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -399,7 +399,7 @@ public class Data implements WritableComparable<Data> { return (Object[])content; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type)); } } @@ -418,7 +418,7 @@ public class Data implements WritableComparable<Data> { case FieldTypes.UTF: if (field.charAt(0) != stringDelimiter || field.charAt(field.length()-1) != stringDelimiter) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); } list.add(index, unescape(field.substring(1, field.length()-1))); break; @@ -426,7 +426,7 @@ public class Data implements WritableComparable<Data> { case FieldTypes.BIN: if (field.charAt(0) != '[' || field.charAt(field.length()-1) != ']') { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022); } String[] splits = field.substring(1, field.length()-1).split(String.valueOf(',')); @@ -474,7 +474,7 @@ public class Data implements WritableComparable<Data> { break; default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType)); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType)); } return ++index; http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java deleted file mode 100644 index 0fa07f7..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.mr; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.log4j.PropertyConfigurator; -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.json.util.SchemaSerialization; -import org.apache.sqoop.model.ConfigUtils; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.utils.ClassUtils; -import org.json.simple.JSONObject; -import org.json.simple.JSONValue; - -import java.io.InputStream; -import java.util.Properties; - -/** - * Helper class to store and load various information in/from MapReduce configuration - * object and JobConf object. - */ -public final class ConfigurationUtils { - - private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link"; - - private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link"; - - private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job"; - - private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job"; - - private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = JobConstants.PREFIX_JOB_CONFIG + "config.class.driver"; - - private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link"; - - private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK); - - private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link"; - - private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK); - - private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job"; - - private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG); - - private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job"; - - private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG); - - private static final String MR_JOB_CONFIG_DRIVER_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.driver"; - - private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG); - - private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; - - private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM); - - private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; - - private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO); - - - /** - * Persist Connector configuration object for link. - * - * @param job MapReduce job object - * @param obj Configuration object - */ - public static void setConnectorLinkConfig(Direction type, Job job, Object obj) { - switch (type) { - case FROM: - job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName()); - job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes()); - break; - - case TO: - job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName()); - job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes()); - break; - } - } - - /** - * Persist Connector configuration objects for job. - * - * @param job MapReduce job object - * @param obj Configuration object - */ - public static void setConnectorJobConfig(Direction type, Job job, Object obj) { - switch (type) { - case FROM: - job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName()); - job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); - break; - - case TO: - job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName()); - job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); - break; - } - } - - - /** - * Persist driver configuration object for job. - * - * @param job MapReduce job object - * @param obj Configuration object - */ - public static void setDriverConfig(Job job, Object obj) { - job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName()); - job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); - } - - /** - * Persist Connector generated schema. - * - * @param type Direction of schema we are persisting - * @param job MapReduce Job object - * @param schema Schema - */ - public static void setConnectorSchema(Direction type, Job job, Schema schema) { - if(schema != null) { - String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString(); - switch (type) { - case FROM: - job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes()); - return; - case TO: - job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes()); - return; - } - } - } - - /** - * Retrieve Connector configuration object for connection. - * @param configuration MapReduce configuration object - * @return Configuration object - */ - public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) { - switch (type) { - case FROM: - return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY); - - case TO: - return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY); - } - - return null; - } - - /** - * Retrieve Connector configuration object for job. - * - * @param configuration MapReduce configuration object - * @return Configuration object - */ - public static Object getConnectorJobConfig(Direction type, Configuration configuration) { - switch (type) { - case FROM: - return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY); - - case TO: - return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY); - } - - return null; - } - - /** - * Retrieve Framework configuration object for job. - * - * @param configuration MapReduce configuration object - * @return Configuration object - */ - public static Object getDriverConfig(Configuration configuration) { - return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY); - } - - - - /** - * Retrieve Connector generated schema. - * - * @param type The FROM or TO connector - * @param configuration MapReduce configuration object - */ - public static Schema getConnectorSchema(Direction type, Configuration configuration) { - switch (type) { - case FROM: - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY)); - - case TO: - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY)); - } - - return null; - } - - /** - * Deserialize schema from JSON encoded bytes. - * - * This method is null safe. - * - * @param bytes - * @return - */ - private static Schema getSchemaFromBytes(byte[] bytes) { - if(bytes == null) { - return null; - } - - JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes)); - return SchemaSerialization.restoreSchema(jsonSchema); - } - - /** - * Load configuration instance serialized in Hadoop credentials cache. - * - * @param configuration JobConf object associated with the job - * @param classProperty Property with stored configuration class name - * @param valueProperty Property with stored JSON representation of the - * configuration object - * @return New instance with loaded data - */ - private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) { - // Create new instance of configuration class - Object object = ClassUtils.instantiate(configuration.get(classProperty)); - if(object == null) { - return null; - } - - String json = new String(configuration.getCredentials().getSecretKey(valueProperty)); - - // Fill it with JSON data - ConfigUtils.fillValues(json, object); - - // And give it back - return object; - } - - private ConfigurationUtils() { - // Instantiation is prohibited - } - - public static void configureLogging() { - try { - Properties props = new Properties(); - InputStream resourceAsStream = - SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties"); - props.load(resourceAsStream); - PropertyConfigurator.configure(props); - } catch (Exception e) { - System.err.println("Encountered exception while configuring logging " + - "for sqoop: " + e); - } - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java new file mode 100644 index 0000000..03a1dec --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.PropertyConfigurator; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.json.util.SchemaSerialization; +import org.apache.sqoop.model.ConfigUtils; +import org.apache.sqoop.schema.Schema; +import org.apache.sqoop.utils.ClassUtils; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +import java.io.InputStream; +import java.util.Properties; + +/** + * Helper class to store and load various information in/from MapReduce configuration + * object and JobConf object. + */ +public final class MRConfigurationUtils { + + private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link"; + + private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link"; + + private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job"; + + private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job"; + + private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.driver"; + + private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link"; + + private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK); + + private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link"; + + private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK); + + private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job"; + + private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG); + + private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job"; + + private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG); + + private static final String MR_JOB_CONFIG_DRIVER_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.driver"; + + private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG); + + private static final String SCHEMA_FROM = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; + + private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM); + + private static final String SCHEMA_TO = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; + + private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO); + + + /** + * Persist Connector configuration object for link. + * + * @param job MapReduce job object + * @param obj Configuration object + */ + public static void setConnectorLinkConfig(Direction type, Job job, Object obj) { + switch (type) { + case FROM: + job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName()); + job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes()); + break; + + case TO: + job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName()); + job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes()); + break; + } + } + + /** + * Persist Connector configuration objects for job. + * + * @param job MapReduce job object + * @param obj Configuration object + */ + public static void setConnectorJobConfig(Direction type, Job job, Object obj) { + switch (type) { + case FROM: + job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName()); + job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); + break; + + case TO: + job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName()); + job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); + break; + } + } + + + /** + * Persist driver configuration object for job. + * + * @param job MapReduce job object + * @param obj Configuration object + */ + public static void setDriverConfig(Job job, Object obj) { + job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName()); + job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes()); + } + + /** + * Persist Connector generated schema. + * + * @param type Direction of schema we are persisting + * @param job MapReduce Job object + * @param schema Schema + */ + public static void setConnectorSchema(Direction type, Job job, Schema schema) { + if(schema != null) { + String jsonSchema = SchemaSerialization.extractSchema(schema).toJSONString(); + switch (type) { + case FROM: + job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes()); + return; + case TO: + job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes()); + return; + } + } + } + + /** + * Retrieve Connector configuration object for connection. + * @param configuration MapReduce configuration object + * @return Configuration object + */ + public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) { + switch (type) { + case FROM: + return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY); + + case TO: + return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY); + } + + return null; + } + + /** + * Retrieve Connector configuration object for job. + * + * @param configuration MapReduce configuration object + * @return Configuration object + */ + public static Object getConnectorJobConfig(Direction type, Configuration configuration) { + switch (type) { + case FROM: + return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY); + + case TO: + return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY); + } + + return null; + } + + /** + * Retrieve Framework configuration object for job. + * + * @param configuration MapReduce configuration object + * @return Configuration object + */ + public static Object getDriverConfig(Configuration configuration) { + return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY); + } + + + + /** + * Retrieve Connector generated schema. + * + * @param type The FROM or TO connector + * @param configuration MapReduce configuration object + */ + public static Schema getConnectorSchema(Direction type, Configuration configuration) { + switch (type) { + case FROM: + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY)); + + case TO: + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY)); + } + + return null; + } + + /** + * Deserialize schema from JSON encoded bytes. + * + * This method is null safe. + * + * @param bytes + * @return + */ + private static Schema getSchemaFromBytes(byte[] bytes) { + if(bytes == null) { + return null; + } + + JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes)); + return SchemaSerialization.restoreSchema(jsonSchema); + } + + /** + * Load configuration instance serialized in Hadoop credentials cache. + * + * @param configuration JobConf object associated with the job + * @param classProperty Property with stored configuration class name + * @param valueProperty Property with stored JSON representation of the + * configuration object + * @return New instance with loaded data + */ + private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) { + // Create new instance of configuration class + Object object = ClassUtils.instantiate(configuration.get(classProperty)); + if(object == null) { + return null; + } + + String json = new String(configuration.getCredentials().getSecretKey(valueProperty)); + + // Fill it with JSON data + ConfigUtils.fillValues(json, object); + + // And give it back + return object; + } + + private MRConfigurationUtils() { + // Instantiation is prohibited + } + + public static void configureLogging() { + try { + Properties props = new Properties(); + InputStream resourceAsStream = + SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties"); + props.load(resourceAsStream); + PropertyConfigurator.configure(props); + } catch (Exception e) { + System.err.println("Encountered exception while configuring logging " + + "for sqoop: " + e); + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java deleted file mode 100644 index 4c2e206..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.mr; - -import org.apache.hadoop.mapreduce.TaskInputOutputContext; -import org.apache.log4j.Logger; - - -/** - * Runnable that will ping mapreduce context about progress. - */ -public class ProgressRunnable implements Runnable { - - public static final Logger LOG = Logger.getLogger(ProgressRunnable.class); - - /** - * Context class that we should use for reporting progress. - */ - private final TaskInputOutputContext<?,?,?,?> context; - - public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) { - this.context = ctxt; - } - - @Override - public void run() { - LOG.debug("Auto-progress thread reporting progress"); - this.context.progress(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 8d2a1da..b385926 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; @@ -47,13 +47,13 @@ public class SqoopDestroyerExecutor { switch (direction) { default: case FROM: - destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER; - prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT; + destroyerPropertyName = MRJobConstants.JOB_ETL_FROM_DESTROYER; + prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT; break; case TO: - destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER; - prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT; + destroyerPropertyName = MRJobConstants.JOB_ETL_TO_DESTROYER; + prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT; break; } @@ -66,11 +66,11 @@ public class SqoopDestroyerExecutor { // Objects that should be pass to the Destroyer execution PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); - Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration); - Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration); + Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration); + Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration); // Propagate connector schema in every case for now - Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration); + Schema schema = MRConfigurationUtils.getConnectorSchema(direction, configuration); DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index ca77e16..f451044 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; /** @@ -56,13 +56,13 @@ public class SqoopFileOutputFormat Path filepath = getDefaultWorkFile(context, ""); String filename = filepath.toString(); - conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename); + conf.set(MRJobConstants.JOB_MR_OUTPUT_FILE, filename); boolean isCompressed = getCompressOutput(context); if (isCompressed) { String codecname = - conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName()); - conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname); + conf.get(MRJobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName()); + conf.set(MRJobConstants.JOB_MR_OUTPUT_CODEC, codecname); } return new SqoopOutputFormatLoadExecutor(context).getRecordWriter(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 1c1133a..d2cf5e4 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -31,8 +31,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; @@ -59,15 +59,15 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER); + String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); - PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); - Object connectorJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); - Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf); + PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); - long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10); + long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob); @@ -80,7 +80,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { } if(splits.size() > maxPartitions) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025, + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025, String.format("Got %d, max was %d", splits.size(), maxPartitions)); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 03d84d4..d31aa20 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -31,8 +31,8 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.matcher.Matcher; import org.apache.sqoop.connector.matcher.MatcherFactory; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -47,7 +47,7 @@ import org.apache.sqoop.utils.ClassUtils; public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> { static { - ConfigurationUtils.configureLogging(); + MRConfigurationUtils.configureLogging(); } public static final Logger LOG = Logger.getLogger(SqoopMapper.class); @@ -63,14 +63,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR); + String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR); Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); matcher = MatcherFactory.getMatcher( - ConfigurationUtils.getConnectorSchema(Direction.FROM, conf), - ConfigurationUtils.getConnectorSchema(Direction.TO, conf)); + MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf), + MRConfigurationUtils.getConnectorSchema(Direction.TO, conf)); - String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT); + String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT); fromDataFormat = (IntermediateDataFormat<String>) ClassUtils .instantiate(intermediateDataFormatName); fromDataFormat.setSchema(matcher.getFromSchema()); @@ -79,16 +79,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, toDataFormat.setSchema(matcher.getToSchema()); // Objects that should be passed to the Executor execution - PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); - Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); + PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context)); try { LOG.info("Starting progress service"); - progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES); LOG.info("Running extractor class " + extractorName); extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); @@ -96,7 +96,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, context.getCounter(SqoopCounters.ROWS_READ) .increment(extractor.getRowsRead()); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e); } finally { LOG.info("Stopping progress service"); progressService.shutdown(); @@ -145,7 +145,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, writable.setString(toDataFormat.getTextData()); context.write(writable, NullWritable.get()); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 594b5e9..1148c4a 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 1ebd3e4..9108981 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -36,8 +36,8 @@ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.matcher.Matcher; import org.apache.sqoop.connector.matcher.MatcherFactory; -import org.apache.sqoop.job.JobConstants; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRJobConstants; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; @@ -75,10 +75,10 @@ public class SqoopOutputFormatLoadExecutor { context = jobctx; writer = new SqoopRecordWriter(); matcher = MatcherFactory.getMatcher( - ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), - ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); + MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()), + MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration())); dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context - .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT)); + .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT)); dataFormat.setSchema(matcher.getToSchema()); } @@ -141,7 +141,7 @@ public class SqoopOutputFormatLoadExecutor { //In the rare case, it was not a SqoopException Throwables.propagate(t); } catch (Exception ex) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); } } @@ -186,7 +186,7 @@ public class SqoopOutputFormatLoadExecutor { } catch (Throwable t) { readerFinished = true; LOG.error("Caught exception e while getting content ", t); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } finally { releaseSema(); } @@ -221,7 +221,7 @@ public class SqoopOutputFormatLoadExecutor { Configuration conf = null; if (!isTest) { conf = context.getConfiguration(); - loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER); } Loader loader = (Loader) ClassUtils.instantiate(loaderName); @@ -233,11 +233,11 @@ public class SqoopOutputFormatLoadExecutor { if (!isTest) { // Using the TO schema since the IDF returns data in TO schema - schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf); + schema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); - configJob = ConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); + subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); + configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); + configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); } // Create loader context @@ -252,7 +252,7 @@ public class SqoopOutputFormatLoadExecutor { // Release so that the writer can tell Sqoop something went // wrong. free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } // if no exception happens yet and reader finished before writer, @@ -264,7 +264,7 @@ public class SqoopOutputFormatLoadExecutor { // Release so that the writer can tell Sqoop something went // wrong. free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019); } // inform writer that reader is finished http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java new file mode 100644 index 0000000..cd4f8b9 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.mr; + +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.log4j.Logger; + + +/** + * Runnable that will ping mapreduce context about progress. + */ +public class SqoopProgressRunnable implements Runnable { + + public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class); + + /** + * Context class that we should use for reporting progress. + */ + private final TaskInputOutputContext<?,?,?,?> context; + + public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) { + this.context = ctx; + } + + @Override + public void run() { + LOG.debug("Auto-progress thread reporting progress"); + this.context.progress(); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java index a55534a..cf023c3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> { static { - ConfigurationUtils.configureLogging(); + MRConfigurationUtils.configureLogging(); } public static final Logger LOG = Logger.getLogger(SqoopReducer.class); @@ -46,7 +46,7 @@ public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWrit public void run(Context context) throws IOException, InterruptedException { try { LOG.info("Starting progress service"); - progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES); + progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES); // Delegating all functionality to our parent super.run(context); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java index dca4c90..c2f5756 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java @@ -24,7 +24,7 @@ import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.MRExecutionError; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.utils.ClassUtils; @@ -60,12 +60,12 @@ public class SqoopSplit extends InputSplit implements Writable { // instantiate Partition object Class<?> clz = ClassUtils.loadClass(className); if (clz == null) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, className); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0009, className); } try { partition = (Partition) clz.newInstance(); } catch (Exception e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, className, e); + throw new SqoopException(MRExecutionError.MAPRED_EXEC_0010, className, e); } // read Partition object content partition.readFields(in); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index e3b68e2..6d0dcb4 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -45,7 +45,7 @@ import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.job.mr.SqoopNullOutputFormat; @@ -68,8 +68,8 @@ public class TestMapReduce { @Test public void testInputFormat() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); @@ -87,17 +87,17 @@ public class TestMapReduce { @Test public void testMapper() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Schema schema = new Schema("Test"); schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) .addColumn(new org.apache.sqoop.schema.type.Text("3")); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); boolean success = JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); Assert.assertEquals("Job failed!", true, success); @@ -106,20 +106,20 @@ public class TestMapReduce { @Test public void testOutputFormat() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); - conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); - conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); + conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName()); + conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Schema schema = new Schema("Test"); schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2")) .addColumn(new Text("3")); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema); boolean success = JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, SqoopNullOutputFormat.class); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 7f9a147..665a65b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.SqoopWritable; -import org.apache.sqoop.job.mr.ConfigurationUtils; +import org.apache.sqoop.job.mr.MRConfigurationUtils; import org.apache.sqoop.job.mr.SqoopInputFormat; import org.apache.sqoop.job.mr.SqoopMapper; import org.apache.sqoop.schema.Schema; @@ -123,14 +123,14 @@ public class TestMatching { @Test public void testSchemaMatching() throws Exception { Configuration conf = new Configuration(); - conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); - conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT, + conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); + conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); + conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName()); Job job = new Job(conf); - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); - ConfigurationUtils.setConnectorSchema(Direction.TO, job, to); + MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from); + MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to); JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class); boolean success = JobUtils.runJob(job.getConfiguration(), http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java index f5742a2..68ce5ed 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java @@ -31,7 +31,7 @@ import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.JobConstants; +import org.apache.sqoop.job.MRJobConstants; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java deleted file mode 100644 index 501e32c..0000000 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.mr; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.sqoop.common.Direction; -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Config; -import org.apache.sqoop.model.ConfigClass; -import org.apache.sqoop.model.Input; -import org.apache.sqoop.schema.Schema; -import org.apache.sqoop.schema.type.Text; -import org.junit.Before; -import org.junit.Test; - -/** - * Current tests are using mockito to propagate credentials from hadoop Job object - * to hadoop JobConf object. This implementation was chosen because it's not clear - * how MapReduce is converting one object to another. - */ -public class TestConfigurationUtils { - - Job job; - JobConf jobConfSpy; - - @Before - public void setUp() throws Exception { - setUpHadoopJob(); - setUpHadoopJobConf(); - } - - public void setUpHadoopJob() throws Exception { - job = new Job(); - } - - public void setUpHadoopJobConf() throws Exception { - jobConfSpy = spy(new JobConf(job.getConfiguration())); - when(jobConfSpy.getCredentials()).thenReturn(job.getCredentials()); - } - - @Test - public void testLinkConfiguration() throws Exception { - ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); - setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); - - ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); - setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); - } - - @Test - public void testJobConfiguration() throws Exception { - ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig()); - setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy)); - - ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig()); - setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy)); - } - - @Test - public void testDriverConfiguration() throws Exception { - ConfigurationUtils.setDriverConfig(job, getConfig()); - setUpHadoopJobConf(); - assertEquals(getConfig(), ConfigurationUtils.getDriverConfig(jobConfSpy)); - } - - @Test - public void testConnectorSchema() throws Exception { - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a")); - assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); - - ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b")); - assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy)); - } - - @Test - public void testConnectorSchemaNull() throws Exception { - ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null); - assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); - - ConfigurationUtils.setConnectorSchema(Direction.TO, job, null); - assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy)); - } - - private Schema getSchema(String name) { - return new Schema(name).addColumn(new Text("c1")); - } - - private TestConfiguration getConfig() { - TestConfiguration c = new TestConfiguration(); - c.c.A = "This is secret text!"; - return c; - } - - @ConfigClass - public static class C { - - @Input String A; - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof C)) return false; - - C c = (C) o; - - if (A != null ? !A.equals(c.A) : c.A != null) return false; - - return true; - } - - @Override - public int hashCode() { - return A != null ? A.hashCode() : 0; - } - } - - @ConfigurationClass - public static class TestConfiguration { - @Config C c; - - public TestConfiguration() { - c = new C(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof TestConfiguration)) return false; - - TestConfiguration config = (TestConfiguration) o; - - if (c != null ? !c.equals(config.c) : config.c != null) - return false; - - return true; - } - - @Override - public int hashCode() { - return c != null ? c.hashCode() : 0; - } - } -}
