Repository: sqoop Updated Branches: refs/heads/sqoop2 151a0a12a -> 8c604754a
SQOOP-1554: Add NullConfigurationClass/ EmptyConfigurationClass to support use cases that do not have a particular type of config (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/8c604754 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/8c604754 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/8c604754 Branch: refs/heads/sqoop2 Commit: 8c604754a4a8f17faaf9f946962976b937547f22 Parents: 151a0a1 Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Oct 22 22:52:07 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Oct 22 22:52:07 2014 -0700 ---------------------------------------------------------------------- .../sqoop/connector/hdfs/HdfsConnector.java | 20 ++++---- .../sqoop/connector/hdfs/HdfsDestroyer.java | 36 ------------- .../sqoop/connector/hdfs/HdfsExtractor.java | 22 ++++---- .../sqoop/connector/hdfs/HdfsFromDestroyer.java | 38 ++++++++++++++ .../connector/hdfs/HdfsFromInitializer.java | 48 +++++++++++++++++ .../sqoop/connector/hdfs/HdfsInitializer.java | 44 ---------------- .../apache/sqoop/connector/hdfs/HdfsLoader.java | 12 ++--- .../sqoop/connector/hdfs/HdfsPartitioner.java | 20 ++++---- .../sqoop/connector/hdfs/HdfsToDestroyer.java | 38 ++++++++++++++ .../sqoop/connector/hdfs/HdfsToInitializer.java | 48 +++++++++++++++++ .../hdfs/configuration/LinkConfig.java | 29 ----------- .../hdfs/configuration/LinkConfiguration.java | 31 ----------- .../resources/hdfs-connector-config.properties | 8 --- .../sqoop/connector/hdfs/TestExtractor.java | 28 +++++----- .../apache/sqoop/connector/hdfs/TestLoader.java | 34 ++++++------ .../sqoop/connector/hdfs/TestPartitioner.java | 25 ++++----- .../connector/common/EmptyConfiguration.java | 29 +++++++++++ .../org/apache/sqoop/driver/JobManager.java | 34 ++++++------ .../org/apache/sqoop/job/TestMapReduce.java | 21 ++++---- .../java/org/apache/sqoop/job/TestMatching.java | 9 ++-- .../org/apache/sqoop/job/etl/Destroyer.java | 4 +- .../org/apache/sqoop/job/etl/Extractor.java | 10 ++-- .../org/apache/sqoop/job/etl/Initializer.java | 54 ++++++++++++-------- .../java/org/apache/sqoop/job/etl/Loader.java | 9 ++-- .../org/apache/sqoop/job/etl/Partitioner.java | 10 ++-- 25 files changed, 365 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java index e63e464..cce0e29 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java @@ -18,10 +18,13 @@ package org.apache.sqoop.connector.hdfs; +import java.util.Locale; +import java.util.ResourceBundle; + import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.common.VersionInfo; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; @@ -30,21 +33,18 @@ import org.apache.sqoop.job.etl.From; import org.apache.sqoop.job.etl.To; import org.apache.sqoop.validation.Validator; -import java.util.Locale; -import java.util.ResourceBundle; - public class HdfsConnector extends SqoopConnector { private static final From FROM = new From( - HdfsInitializer.class, + HdfsFromInitializer.class, HdfsPartitioner.class, HdfsExtractor.class, - HdfsDestroyer.class); + HdfsFromDestroyer.class); private static final To TO = new To( - HdfsInitializer.class, + HdfsToInitializer.class, HdfsLoader.class, - HdfsDestroyer.class); + HdfsToDestroyer.class); private static final HdfsValidator hdfsValidator = new HdfsValidator(); @@ -71,15 +71,17 @@ public class HdfsConnector extends SqoopConnector { /** * @return Get connection configuration class */ + @SuppressWarnings("rawtypes") @Override public Class getLinkConfigurationClass() { - return LinkConfiguration.class; + return EmptyConfiguration.class; } /** * @param jobType * @return Get job configuration class for given type or null if not supported */ + @SuppressWarnings("rawtypes") @Override public Class getJobConfigurationClass(Direction jobType) { switch (jobType) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java deleted file mode 100644 index 74b1cb8..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs; - -import org.apache.sqoop.job.etl.Destroyer; -import org.apache.sqoop.job.etl.DestroyerContext; - -public class HdfsDestroyer extends Destroyer { - /** - * Callback to clean up after job execution. - * - * @param context Destroyer context - * @param o Connection configuration object - * @param o2 Job configuration object - */ - @Override - public void destroy(DestroyerContext context, Object o, Object o2) { - //TODO: Add a "success" flag? - - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java index 2c8b6c8..31b0a99 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java @@ -17,6 +17,9 @@ */ package org.apache.sqoop.connector.hdfs; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -26,23 +29,20 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.util.LineReader; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; -import org.apache.log4j.Logger; -import org.apache.hadoop.conf.Configuration; -import org.apache.sqoop.common.PrefixContext; - -import java.io.IOException; /** * Extract from HDFS. * Default field delimiter of a record is comma. */ -public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> { +public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> { public static final Logger LOG = Logger.getLogger(HdfsExtractor.class); @@ -51,9 +51,8 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura private long rowRead = 0; @Override - public void extract(ExtractorContext context, - LinkConfiguration linkConfig, - FromJobConfiguration fromJobConfig, HdfsPartition partition) { + public void extract(ExtractorContext context, EmptyConfiguration linkConfig, + FromJobConfiguration jobConfig, HdfsPartition partition) { conf = ((PrefixContext) context.getContext()).getConfiguration(); dataWriter = context.getDataWriter(); @@ -91,6 +90,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura * @param length * @throws IOException */ + @SuppressWarnings("deprecation") private void extractSequenceFile(Path file, long start, long length) throws IOException { LOG.info("Extracting sequence file"); @@ -123,6 +123,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura * @param length * @throws IOException */ + @SuppressWarnings("resource") private void extractTextFile(Path file, long start, long length) throws IOException { LOG.info("Extracting text file"); @@ -182,6 +183,7 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura * @param file * @return boolean */ + @SuppressWarnings("deprecation") private boolean isSequenceFile(Path file) { SequenceFile.Reader filereader = null; try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java new file mode 100644 index 0000000..c7d35f7 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConfiguration> { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param linkConfig link configuration object + * @param jobConfig FROM job configuration object + */ + @Override + public void destroy(DestroyerContext context, EmptyConfiguration linkConfig, + FromJobConfiguration jobConfig) { + // do nothing at this point + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java new file mode 100644 index 0000000..0752510 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; + + +public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJobConfiguration> { + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Initializer context object + * @param linkConfig link configuration object + * @param jobConfig FROM job configuration object + */ + @Override + public void initialize(InitializerContext context, EmptyConfiguration linkConfig, + FromJobConfiguration jobConfig) { + // do nothing at this point + } + + @Override + public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig, + FromJobConfiguration jobConfig) { + return new Schema("HDFS file"); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java deleted file mode 100644 index bb5e353..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs; - -import org.apache.sqoop.job.etl.Initializer; -import org.apache.sqoop.job.etl.InitializerContext; -import org.apache.sqoop.schema.Schema; - - -public class HdfsInitializer extends Initializer { - /** - * Initialize new submission based on given configuration properties. Any - * needed temporary values might be saved to context object and they will be - * promoted to all other part of the workflow automatically. - * - * @param context Initializer context object - * @param linkConfig Connector's link configuration object - * @param jobConf Connector's job configuration object - */ - @Override - public void initialize(InitializerContext context, Object linkConfig, Object jobConf) { - - } - - @Override - public Schema getSchema(InitializerContext context, Object linkConfig, Object jobConfig) { - return new Schema("HDFS file"); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java index 660418d..682349c 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java @@ -17,13 +17,16 @@ */ package org.apache.sqoop.connector.hdfs; +import java.io.IOException; +import java.util.UUID; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter; @@ -34,10 +37,7 @@ import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.utils.ClassUtils; -import java.io.IOException; -import java.util.UUID; - -public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { +public class HdfsLoader extends Loader<EmptyConfiguration, ToJobConfiguration> { /** * Load data to target. * @@ -47,7 +47,7 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> { * @throws Exception */ @Override - public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception { + public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception { DataReader reader = context.getDataReader(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java index f40459f..daa7fe2 100644 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java @@ -21,36 +21,36 @@ package org.apache.sqoop.connector.hdfs; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.HashMap; -import java.util.Set; -import java.util.Iterator; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; +import org.apache.sqoop.common.PrefixContext; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; -import org.apache.sqoop.common.PrefixContext; /** * This class derives mostly from CombineFileInputFormat of Hadoop, i.e. * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat. */ -public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> { +public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConfiguration> { public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; @@ -68,7 +68,7 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi @Override public List<Partition> getPartitions(PartitionerContext context, - LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { + EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) { Configuration conf = ((PrefixContext)context.getContext()).getConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java new file mode 100644 index 0000000..8bfd727 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfiguration> { + /** + * Callback to clean up after job execution. + * + * @param context Destroyer context + * @param linkConfig link configuration object + * @param jobConfig TO job configuration object + */ + @Override + public void destroy(DestroyerContext context, EmptyConfiguration linkConfig, + ToJobConfiguration jobConfig) { + // do nothing at this point + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java new file mode 100644 index 0000000..e3d54b8 --- /dev/null +++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.hdfs; + +import org.apache.sqoop.connector.common.EmptyConfiguration; +import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; +import org.apache.sqoop.schema.Schema; + + +public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConfiguration> { + /** + * Initialize new submission based on given configuration properties. Any + * needed temporary values might be saved to context object and they will be + * promoted to all other part of the workflow automatically. + * + * @param context Initializer context object + * @param linkConfig link configuration object + * @param jobConfig TO job configuration object + */ + @Override + public void initialize(InitializerContext context, EmptyConfiguration linkConfig, + ToJobConfiguration jobConfig) { + // do nothing at this point + } + + @Override + public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig, + ToJobConfiguration jobConfig) { + return new Schema("HDFS file"); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java deleted file mode 100644 index 5d48a29..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.ConfigClass; -import org.apache.sqoop.model.Input; - -@ConfigClass -public class LinkConfig { - //Todo: Didn't find anything that belongs here... - // Since empty forms don't work (DERBYREPO_0008:The config contains no input metadata), I'm putting a dummy config here - - @Input(size = 255) public String dummy; -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java deleted file mode 100644 index c0cd336..0000000 --- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.connector.hdfs.configuration; - -import org.apache.sqoop.model.ConfigurationClass; -import org.apache.sqoop.model.Config; - -@ConfigurationClass -public class LinkConfiguration { - @Config - public LinkConfig linkConfig; - - public LinkConfiguration() { - linkConfig = new LinkConfig(); - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties index 9b8c6ba..90bc8bc 100644 --- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties +++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties @@ -16,14 +16,6 @@ # Generic HDFS Connector Resources ############################ -# Link Config -# -linkConfig.label = Link configuration -linkConfig.help = You must supply the information requested in order to \ - create a connection object. - -linkConfig.dummy.label = Dummy parameter needed to get HDFS connector to register -linkConfig.dummy.help = You can write anything here. Doesn't matter. # To Job Config # http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java index c6d2f90..124c3df 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java @@ -17,12 +17,20 @@ */ package org.apache.sqoop.connector.hdfs; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.etl.io.DataWriter; @@ -35,14 +43,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; - @RunWith(Parameterized.class) public class TestExtractor extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; @@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase { private ToFormat outputFileType; private Class<? extends CompressionCodec> compressionClass; private final String inputDirectory; - private Extractor extractor; + private Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> extractor; public TestExtractor(ToFormat outputFileType, Class<? extends CompressionCodec> compressionClass) @@ -131,13 +131,11 @@ public class TestExtractor extends TestHdfsBase { } }); - LinkConfiguration connConf = new LinkConfiguration(); - - FromJobConfiguration jobConf = new FromJobConfiguration(); - + EmptyConfiguration emptyLinkConfig = new EmptyConfiguration(); + FromJobConfiguration emptyJobConfig = new FromJobConfiguration(); HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory)); - extractor.extract(context, connConf, jobConf, partition); + extractor.extract(context, emptyLinkConfig, emptyJobConfig, partition); for (int index = 0; index < NUMBER_OF_FILES * NUMBER_OF_ROWS_PER_FILE; ++index) { Assert.assertTrue("Index " + (index + 1) + " was not visited", visited[index]); http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java index 552a751..8429e15 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java @@ -17,6 +17,16 @@ */ package org.apache.sqoop.connector.hdfs; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -27,7 +37,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToCompression; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration; @@ -41,16 +51,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; - @RunWith(Parameterized.class) public class TestLoader extends TestHdfsBase { private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/"; @@ -121,25 +121,25 @@ public class TestLoader extends TestHdfsBase { return null; } }, null); - LinkConfiguration connConf = new LinkConfiguration(); + EmptyConfiguration linkConf = new EmptyConfiguration(); ToJobConfiguration jobConf = new ToJobConfiguration(); jobConf.toJobConfig.outputDirectory = outputDirectory; jobConf.toJobConfig.compression = compression; jobConf.toJobConfig.outputFormat = outputFormat; Path outputPath = new Path(outputDirectory); - loader.load(context, connConf, jobConf); + loader.load(context, linkConf, jobConf); Assert.assertEquals(1, fs.listStatus(outputPath).length); for (FileStatus status : fs.listStatus(outputPath)) { verifyOutput(fs, status.getPath()); } - loader.load(context, connConf, jobConf); + loader.load(context, linkConf, jobConf); Assert.assertEquals(2, fs.listStatus(outputPath).length); - loader.load(context, connConf, jobConf); - loader.load(context, connConf, jobConf); - loader.load(context, connConf, jobConf); + loader.load(context, linkConf, jobConf); + loader.load(context, linkConf, jobConf); + loader.load(context, linkConf, jobConf); Assert.assertEquals(5, fs.listStatus(outputPath).length); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java index 9d177ec..bef1984 100644 --- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java +++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java @@ -17,12 +17,21 @@ */ package org.apache.sqoop.connector.hdfs; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.SEQUENCE_FILE; +import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.TEXT_FILE; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.sqoop.common.PrefixContext; -import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.job.etl.Partition; @@ -30,17 +39,9 @@ import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import static org.apache.sqoop.connector.hdfs.configuration.ToFormat.*; -import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestPartitioner extends TestHdfsBase { @@ -97,12 +98,12 @@ public class TestPartitioner extends TestHdfsBase { Configuration conf = new Configuration(); PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context."); PartitionerContext context = new PartitionerContext(prefixContext, 5, null); - LinkConfiguration connConf = new LinkConfiguration(); + EmptyConfiguration linkConf = new EmptyConfiguration(); FromJobConfiguration jobConf = new FromJobConfiguration(); jobConf.fromJobConfig.inputDirectory = inputDirectory; - List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf); + List<Partition> partitions = partitioner.getPartitions(context, linkConf, jobConf); if (this.compressionClass == null) { assertEquals(5, partitions.size()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java new file mode 100644 index 0000000..60b9e93 --- /dev/null +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/EmptyConfiguration.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.connector.common; + +import org.apache.sqoop.model.ConfigurationClass; + +/** + * Marker empty link configuration class with empty config + * +**/ +@ConfigurationClass +public class EmptyConfiguration { + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 51e562c..e83002d 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -350,25 +350,25 @@ public class JobManager implements Reconfigurable { jobRequest.setJobName(job.getName()); jobRequest.setJobId(job.getPersistenceId()); jobRequest.setNotificationUrl(notificationBaseUrl + jobId); - Class<? extends IntermediateDataFormat<?>> dataFormatClass = - fromConnector.getIntermediateDataFormat(); + Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector + .getIntermediateDataFormat(); jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat()); - jobRequest.setFrom(fromConnector.getFrom()); jobRequest.setTo(toConnector.getTo()); + // set all the jars addStandardJars(jobRequest); addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass); addConnectorInitializerJars(jobRequest, Direction.FROM); addConnectorInitializerJars(jobRequest, Direction.TO); - Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM); - Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO); - + // call the intialize method + initializeConnector(jobRequest, Direction.FROM); + initializeConnector(jobRequest, Direction.TO); - jobRequest.getSummary().setFromSchema(fromSchema); - jobRequest.getSummary().setToSchema(toSchema); + jobRequest.getSummary().setFromSchema(getSchemaForConnector(jobRequest, Direction.FROM)); + jobRequest.getSummary().setToSchema(getSchemaForConnector(jobRequest, Direction.TO)); LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); return jobRequest; @@ -435,21 +435,22 @@ public class JobManager implements Reconfigurable { } return job; } - - private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) { + private void initializeConnector(JobRequest jobRequest, Direction direction) { Initializer initializer = getConnectorInitializer(jobRequest, direction); - - // Initializer context InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); // Initialize submission from the connector perspective initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)); + } + private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) { + + Initializer initializer = getConnectorInitializer(jobRequest, direction); + InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); - return initializer.getSchema(initializerContext, - jobRequest.getConnectorLinkConfig(direction), + return initializer.getSchema(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)); } @@ -459,8 +460,7 @@ public class JobManager implements Reconfigurable { InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); // Add job specific jars to jobRequest.addJars(initializer.getJars(initializerContext, - jobRequest.getConnectorLinkConfig(direction), - jobRequest.getJobConfig(direction))); + jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction))); } private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) { @@ -698,4 +698,4 @@ public class JobManager implements Reconfigurable { LOG.info("Ending submission manager update thread"); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 6d0dcb4..78ae4ec 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.job; +import static org.junit.Assert.assertEquals; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -33,6 +35,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; @@ -57,8 +60,6 @@ import org.apache.sqoop.schema.type.Text; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; - public class TestMapReduce { private static final int START_PARTITION = 1; @@ -170,9 +171,9 @@ public class TestMapReduce { } } - public static class DummyExtractor extends Extractor { + public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> { @Override - public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { + public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { context.getDataWriter().writeArrayRecord(new Object[] { @@ -250,12 +251,12 @@ public class TestMapReduce { } } - public static class DummyLoader extends Loader { + public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> { private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION; private Data expected = new Data(); @Override - public void load(LoaderContext context, Object oc, Object oj) throws Exception{ + public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{ String data; while ((data = context.getDataReader().readTextRecord()) != null) { expected.setContent(new Object[] { @@ -269,22 +270,22 @@ public class TestMapReduce { } } - public static class DummyFromDestroyer extends Destroyer { + public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> { public static int count = 0; @Override - public void destroy(DestroyerContext context, Object o, Object o2) { + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; } } - public static class DummyToDestroyer extends Destroyer { + public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> { public static int count = 0; @Override - public void destroy(DestroyerContext context, Object o, Object o2) { + public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) { count++; } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java index 665a65b..04fb692 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java @@ -17,6 +17,8 @@ */ package org.apache.sqoop.job; +import static org.junit.Assert.assertEquals; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -34,6 +36,7 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.common.EmptyConfiguration; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -53,8 +56,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.junit.Assert.assertEquals; - @RunWith(Parameterized.class) public class TestMatching { @@ -193,9 +194,9 @@ public class TestMatching { } } - public static class DummyExtractor extends Extractor { + public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, Partition> { @Override - public void extract(ExtractorContext context, Object oc, Object oj, Object partition) { + public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, Partition partition) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { context.getDataWriter().writeArrayRecord(new Object[] { http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index a133106..e2d98ca 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -28,7 +28,9 @@ public abstract class Destroyer<LinkConfiguration, JobConfiguration> { * * @param context Destroyer context * @param linkConfiguration link configuration object - * @param jobConfiguration job configuration object + * @param jobConfiguration job configuration object for the FROM and TO + * In case of the FROM initializer this will represent the FROM job configuration + * In case of the TO initializer this will represent the TO job configuration */ public abstract void destroy(DestroyerContext context, LinkConfiguration linkConfiguration, http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index d6c186d..85e91ef 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -21,20 +21,20 @@ package org.apache.sqoop.job.etl; * This allows connector to extract data from a source system * based on each partition. */ -public abstract class Extractor<LinkConfiguration, JobConfiguration, Partition> { +public abstract class Extractor<LinkConfiguration, FromJobConfiguration, SqoopPartition> { /** * Extract data from source and pass them into the Sqoop. * * @param context Extractor context object * @param linkConfiguration link configuration object - * @param jobConfiguration job configuration object - * @param partition Partition that this extract should work on + * @param jobConfiguration FROM job configuration object + * @param partition Partition that this extracter should work on */ public abstract void extract(ExtractorContext context, LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration, - Partition partition); + FromJobConfiguration jobConfiguration, + SqoopPartition partition); /** * Return the number of rows read by the last call to http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 5c48fc3..d66b099 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -35,27 +35,39 @@ public abstract class Initializer<LinkConfiguration, JobConfiguration> { * * @param context Initializer context object * @param linkConfiguration link configuration object - * @param jobConfiguration job configuration object + * @param jobConfiguration job configuration object for the FROM and TO + * In case of the FROM initializer this will represent the FROM job configuration + * In case of the TO initializer this will represent the TO job configuration */ - public abstract void initialize(InitializerContext context, - LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); - - /** - * Return list of all jars that this particular connector needs to operate - * on following job. This method will be called after running initialize - * method. - * - * @return - */ - public List<String> getJars(InitializerContext context, - LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration) { - return new LinkedList<String>(); - } - - public abstract Schema getSchema(InitializerContext context, - LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); + public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration); + + /** + * Return list of all jars that this particular connector needs to operate on + * following job. This method will be called after running initialize method. + * @param context Initializer context object + * @param linkConfiguration link configuration object + * @param jobConfiguration job configuration object for the FROM and TO + * In case of the FROM initializer this will represent the FROM job configuration + * In case of the TO initializer this will represent the TO job configuration + * @return + */ + public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) { + return new LinkedList<String>(); + } + + /** + * Return schema associated with the connector for FROM and TO + * @param context Initializer context object + * @param linkConfiguration link configuration object + * @param jobConfiguration job configuration object for the FROM and TO + * In case of the FROM initializer this will represent the FROM job configuration + * In case of the TO initializer this will represent the TO job configuration + * @return + */ + + public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index cc32ada..3b6bd71 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -20,18 +20,17 @@ package org.apache.sqoop.job.etl; /** * This allows connector to load data into a target system. */ -public abstract class Loader<LinkConfiguration, JobConfiguration> { +public abstract class Loader<LinkConfiguration, ToJobConfiguration> { /** * Load data to target. * * @param context Loader context object * @param linkConfiguration link configuration object - * @param jobConfiguration job configuration object + * @param jobConfiguration TO job configuration object * @throws Exception */ - public abstract void load(LoaderContext context, - LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration) throws Exception; + public abstract void load(LoaderContext context, LinkConfiguration linkConfiguration, + ToJobConfiguration jobConfiguration) throws Exception; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/8c604754/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java index 57507df..3636130 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java @@ -20,10 +20,10 @@ package org.apache.sqoop.job.etl; import java.util.List; /** - * This allows connector to define how input data to be partitioned. + * This allows connector to define how input data from the FROM source can be partitioned. * The number of data partitions also determines the degree of parallelism. */ -public abstract class Partitioner<LinkConfiguration, JobConfiguration> { +public abstract class Partitioner<LinkConfiguration, FromJobConfiguration> { /** * Partition input data into partitions. @@ -35,8 +35,6 @@ public abstract class Partitioner<LinkConfiguration, JobConfiguration> { * @param jobConfiguration job configuration object * @return */ - public abstract List<Partition> getPartitions(PartitionerContext context, - LinkConfiguration linkConfiguration, - JobConfiguration jobConfiguration); - + public abstract List<Partition> getPartitions(PartitionerContext context, + LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration); }
