Repository: sqoop Updated Branches: refs/heads/sqoop2 26ae15c20 -> d3fbe102a
SQOOP-2490: Sqoop2: Add extra jars to job (Abraham Fine via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d3fbe102 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d3fbe102 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d3fbe102 Branch: refs/heads/sqoop2 Commit: d3fbe102acf7360252ba47e5fd0c3f297bdd9404 Parents: 26ae15c Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Oct 6 20:31:28 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Oct 6 20:31:28 2015 -0700 ---------------------------------------------------------------------- .../sqoop/core/ConfigurationConstants.java | 5 + .../apache/sqoop/core/SqoopConfiguration.java | 66 ++-- .../org/apache/sqoop/driver/JobManager.java | 32 ++ .../sqoop/driver/configuration/JarConfig.java | 30 ++ .../driver/configuration/JobConfiguration.java | 5 + .../apache/sqoop/repository/JdbcRepository.java | 1 + .../src/main/resources/driver-config.properties | 7 + .../sqoop/driver/TestJobConfiguration.java | 63 ++++ dist/src/main/server/conf/sqoop.properties | 5 + .../test/minicluster/SqoopMiniCluster.java | 6 + .../integration/classpath/ClasspathTest.java | 333 +++++++++++++++++++ .../resources/TestConnector/TestConnector.java | 89 +++++ .../resources/TestConnector/TestDependency.java | 23 ++ .../TestConnector/TestLinkConfiguration.java | 25 ++ .../resources/TestConnector/TestLoader.java | 45 +++ .../TestConnector/TestToDestroyer.java | 27 ++ .../TestConnector/TestToInitializer.java | 27 ++ .../TestConnector/TestToJobConfiguration.java | 26 ++ .../TestConnector/sqoopconnector.properties | 18 + .../test/resources/classpath-tests-suite.xml | 33 ++ 20 files changed, 838 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java b/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java index d7fe27b..af53acb 100644 --- a/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java +++ b/core/src/main/java/org/apache/sqoop/core/ConfigurationConstants.java @@ -82,6 +82,11 @@ public final class ConfigurationConstants { public static final String CLASSPATH = "org.apache.sqoop.classpath.extra"; /** + * Add external jars to job classpath. + */ + public static final String JOB_CLASSPATH = "org.apache.sqoop.classpath.job"; + + /** * Enable Sqoop App to kill Tomcat in case that it will fail to load. */ public static final String KILL_TOMCAT_ON_FAILURE = "sqoop.kill_tomcat_on_load_failure"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java index 52e432d..ab5b2ee 100644 --- a/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/core/SqoopConfiguration.java @@ -38,6 +38,7 @@ import org.apache.sqoop.classification.InterfaceStability; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.error.code.CoreError; +import org.apache.sqoop.utils.ClassUtils; import static org.apache.sqoop.utils.ContextUtils.getUniqueStrings; @@ -189,7 +190,7 @@ public class SqoopConfiguration implements Reconfigurable { initialized = true; - configureClassLoader(ConfigurationConstants.CLASSPATH); + configureClassLoader(); } public synchronized MapContext getContext() { @@ -229,44 +230,53 @@ public class SqoopConfiguration implements Reconfigurable { /** * Load extra classpath from sqoop configuration. - * @param classpathProperty */ - private synchronized void configureClassLoader(String classpathProperty) { - LOG.info("Adding jars to current classloader from property: " + classpathProperty); - - // CSV URL list separated by ":". - String paths = getContext().getString(classpathProperty); - - if (StringUtils.isEmpty(paths)) { - LOG.debug("Property " + classpathProperty + " is null or empty. Not adding any extra jars."); - return; - } + private synchronized void configureClassLoader() { + LOG.info("Adding jars to current classloader from property: " + ConfigurationConstants.CLASSPATH); + List<URL> urls = getJarsForProperty(ConfigurationConstants.CLASSPATH); + // Chain the current thread classloader so that + // configured classpath adds to existing classloader. + // Existing classpath is not changed. ClassLoader currentThreadClassLoader = Thread.currentThread().getContextClassLoader(); if (currentThreadClassLoader == null) { throw new SqoopException(CoreError.CORE_0009, "No thread context classloader to override."); } + URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), + currentThreadClassLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } - Set<String> pathSet = getUniqueStrings(paths); - List<URL> urls = new LinkedList<URL>(); - for (String path : pathSet) { - try { - LOG.debug("Found jar in path: " + path); - URL url = new File(path).toURI().toURL(); - urls.add(url); - LOG.debug("Using URL: " + url.toString()); - } catch (MalformedURLException e) { - throw new SqoopException(CoreError.CORE_0009, "Malformed URL found.", e); + /** + * Get the list of jars for a given property + * + * @param classpathProperty The property corresponding to the list of jars + * @return A list of URLs pointing to the jars + */ + public synchronized List<URL> getJarsForProperty(String classpathProperty) { + List<URL> urls = new LinkedList<>(); + + // CSV URL list separated by ":". + String paths = getContext().getString(classpathProperty); + + if (StringUtils.isEmpty(paths)) { + LOG.debug("Property " + classpathProperty + " is null or empty."); + } else { + Set<String> pathSet = getUniqueStrings(paths); + for (String path : pathSet) { + try { + LOG.debug("Found jar in path: " + path); + URL url = new File(path).toURI().toURL(); + urls.add(url); + LOG.debug("Using URL: " + url.toString()); + } catch (MalformedURLException e) { + throw new SqoopException(CoreError.CORE_0009, "Malformed URL found.", e); + } } } - // Chain the current thread classloader so that - // configured classpath adds to existing classloader. - // Existing classpath is not changed. - URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]), - currentThreadClassLoader); - Thread.currentThread().setContextClassLoader(classLoader); + return urls; } private synchronized void configureLogging() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index ebb7efd..0d230f9 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -17,8 +17,11 @@ */ package org.apache.sqoop.driver; +import java.net.URL; import java.util.Date; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; @@ -27,6 +30,7 @@ import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.core.ConfigurationConstants; import org.apache.sqoop.core.Reconfigurable; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener; @@ -163,6 +167,11 @@ public class JobManager implements Reconfigurable { private String notificationBaseUrl; /** + * Additional jars to be made available during job execution + */ + private List<URL> extraJobClasspath; + + /** * Set notification base URL. * * @param url @@ -210,6 +219,8 @@ public class JobManager implements Reconfigurable { if (executionEngine != null) { executionEngine.destroy(); } + + extraJobClasspath = null; } public synchronized void initialize() { @@ -241,6 +252,8 @@ public class JobManager implements Reconfigurable { executionEngineClassName); } + extraJobClasspath = SqoopConfiguration.getInstance().getJarsForProperty(ConfigurationConstants.JOB_CLASSPATH); + // We need to make sure that user has configured compatible combination of // submission engine and execution engine if (!submissionEngine @@ -372,6 +385,8 @@ public class JobManager implements Reconfigurable { // set all the jars addStandardJars(jobRequest); + addJobJars(jobRequest); + addExtraJarsFromSqoopConfig(jobRequest); addConnectorClass(jobRequest, fromConnector); addConnectorClass(jobRequest, toConnector); addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat()); @@ -419,6 +434,21 @@ public class JobManager implements Reconfigurable { jobRequest.addJarForClass(executionEngine.getClass()); } + private void addExtraJarsFromSqoopConfig(JobRequest jobRequest) { + Set<String> jars = new HashSet<>(); + for (URL jarURL : extraJobClasspath){ + jars.add(jarURL.toString()); + } + jobRequest.addJars(jars); + } + + private void addJobJars(JobRequest jobRequest) { + JobConfiguration jobConfiguration = (JobConfiguration) jobRequest.getDriverConfig(); + if (jobConfiguration.jarConfig.extraJars != null) { + jobRequest.addJars(new HashSet<>(jobConfiguration.jarConfig.extraJars)); + } + } + MSubmission createJobSubmission(HttpEventContext ctx, long jobId) { MSubmission summary = new MSubmission(jobId); summary.setCreationUser(ctx.getUsername()); @@ -698,6 +728,8 @@ public class JobManager implements Reconfigurable { "You might need to restart the server."); } + extraJobClasspath = SqoopConfiguration.getInstance().getJarsForProperty(ConfigurationConstants.JOB_CLASSPATH); + // Set up worker threads purgeThreshold = newContext.getLong( DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD, http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/driver/configuration/JarConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JarConfig.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JarConfig.java new file mode 100644 index 0000000..b72b109 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JarConfig.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.driver.configuration; + +import org.apache.sqoop.model.ConfigClass; +import org.apache.sqoop.model.Input; + +import java.util.List; + +@ConfigClass +public class JarConfig { + // A list of the FQDNs of additional jars that are needed to execute the job + @Input public List<String> extraJars; +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java index bf1328a..bd3d8df 100644 --- a/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java +++ b/core/src/main/java/org/apache/sqoop/driver/configuration/JobConfiguration.java @@ -28,7 +28,12 @@ public class JobConfiguration { @Config public ThrottlingConfig throttlingConfig; + @Config + public JarConfig jarConfig; + + public JobConfiguration() { throttlingConfig = new ThrottlingConfig(); + jarConfig = new JarConfig(); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index f9649f2..055ccc4 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -242,6 +242,7 @@ public class JdbcRepository extends Repository { // so let's just compare the structure to see if we need upgrade. if(!mDriver.equals(existingDriver)) { if (autoUpgrade) { + mDriver.setPersistenceId(existingDriver.getPersistenceId()); upgradeDriver(mDriver); return mDriver; } else { http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/main/resources/driver-config.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/driver-config.properties b/core/src/main/resources/driver-config.properties index e005775..674e74b 100644 --- a/core/src/main/resources/driver-config.properties +++ b/core/src/main/resources/driver-config.properties @@ -28,3 +28,10 @@ throttlingConfig.numExtractors.help = Number of extractors that Sqoop will use throttlingConfig.numLoaders.label = Loaders throttlingConfig.numLoaders.help = Number of loaders that Sqoop will use +# Jar Configuration +# +jarConfig.label = Classpath configuration +jarConfig.help = Classpath configuration specific to the driver + +jarConfig.extraJars.label = Extra mapper jars +jarConfig.extraJars.help = A list of the FQDNs of additional jars that are needed to execute the job http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/core/src/test/java/org/apache/sqoop/driver/TestJobConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/sqoop/driver/TestJobConfiguration.java b/core/src/test/java/org/apache/sqoop/driver/TestJobConfiguration.java new file mode 100644 index 0000000..4e9a698 --- /dev/null +++ b/core/src/test/java/org/apache/sqoop/driver/TestJobConfiguration.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.driver; + +import org.apache.sqoop.driver.configuration.JobConfiguration; +import org.apache.sqoop.model.ConfigUtils; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MInput; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class TestJobConfiguration { + + @Test + public void testBundleForLink() { + verifyBundleForConfigurationClass(Driver.getInstance().getBundle(Locale + .getDefault()), JobConfiguration.class); + } + + void verifyBundleForConfigurationClass(ResourceBundle bundle, Class klass) { + assertNotNull(bundle); + assertNotNull(klass); + + List<MConfig> configs = ConfigUtils.toConfigs(klass); + + for(MConfig config : configs) { + assertNotNull(config.getHelpKey()); + assertNotNull(config.getLabelKey()); + + assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName()); + assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName()); + + for(MInput input : config.getInputs()) { + assertNotNull(input.getHelpKey()); + assertNotNull(input.getLabelKey()); + + assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName()); + assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/dist/src/main/server/conf/sqoop.properties ---------------------------------------------------------------------- diff --git a/dist/src/main/server/conf/sqoop.properties b/dist/src/main/server/conf/sqoop.properties index fe8bcce..d85e381 100755 --- a/dist/src/main/server/conf/sqoop.properties +++ b/dist/src/main/server/conf/sqoop.properties @@ -179,3 +179,8 @@ org.apache.sqoop.connector.external.loadpath= # ":" separated list of jars to be included in sqoop. # org.apache.sqoop.classpath.extra= + +# Sqoop extra classpath to be included with all jobs +# ":" separated list of jars to be included in map job classpath. +# +org.apache.sqoop.classpath.job= http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java index 7440025..328c053 100644 --- a/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java +++ b/test/src/main/java/org/apache/sqoop/test/minicluster/SqoopMiniCluster.java @@ -17,6 +17,7 @@ */ package org.apache.sqoop.test.minicluster; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.sqoop.core.ConfigurationConstants; @@ -138,6 +139,7 @@ public abstract class SqoopMiniCluster { mapToProperties(sqoopProperties, getSecurityConfiguration()); mapToProperties(sqoopProperties, getConnectorManagerConfiguration()); mapToProperties(sqoopProperties, getDriverManagerConfiguration()); + mapToProperties(sqoopProperties, getClasspathConfiguration()); FileUtils.writeLines(f, sqoopProperties); @@ -215,4 +217,8 @@ public abstract class SqoopMiniCluster { properties.put(ConfigurationConstants.DRIVER_AUTO_UPGRADE, "true"); return properties; } + + protected Map<String, String> getClasspathConfiguration() { + return MapUtils.EMPTY_MAP; + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/java/org/apache/sqoop/integration/classpath/ClasspathTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/classpath/ClasspathTest.java b/test/src/test/java/org/apache/sqoop/integration/classpath/ClasspathTest.java new file mode 100644 index 0000000..393b800 --- /dev/null +++ b/test/src/test/java/org/apache/sqoop/integration/classpath/ClasspathTest.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.integration.classpath; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.sqoop.client.SqoopClient; +import org.apache.sqoop.core.ConfigurationConstants; +import org.apache.sqoop.model.MDriverConfig; +import org.apache.sqoop.model.MJob; +import org.apache.sqoop.model.MLink; +import org.apache.sqoop.test.minicluster.JettySqoopMiniCluster; +import org.apache.sqoop.test.testcases.ConnectorTestCase; +import org.apache.sqoop.test.utils.HdfsUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.tools.JavaCompiler; +import javax.tools.JavaFileObject; +import javax.tools.StandardJavaFileManager; +import javax.tools.StandardLocation; +import javax.tools.ToolProvider; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +public class ClasspathTest extends ConnectorTestCase { + + private static final String TEST_CONNECTOR_JAR_NAME = "test-connector.jar"; + private static final String TEST_DEPENDENCY_JAR_NAME = "test-dependency.jar"; + + private ClassLoader classLoader; + + public static class DerbySqoopMiniCluster extends JettySqoopMiniCluster { + + private String extraClasspath; + private String jobExtraClasspath; + + public DerbySqoopMiniCluster(String temporaryPath, Configuration configuration, String extraClasspath, String jobExtraClasspath) throws Exception { + super(temporaryPath, configuration); + this.extraClasspath = extraClasspath; + this.jobExtraClasspath = jobExtraClasspath; + } + + @Override + protected Map<String, String> getClasspathConfiguration() { + Map<String, String> properties = new HashMap<>(); + + if (extraClasspath != null) { + properties.put(ConfigurationConstants.CLASSPATH, extraClasspath); + } + if (jobExtraClasspath != null) { + properties.put(ConfigurationConstants.JOB_CLASSPATH, jobExtraClasspath); + } + + + return properties; + } + } + + class JarContents { + private List<File> sourceFiles; + private List<File> properitesFiles; + + public JarContents(List<File> sourceFiles, List<File> properitesFiles){ + this.sourceFiles = sourceFiles; + this.properitesFiles = properitesFiles; + } + + public List<File> getSourceFiles() { + return sourceFiles; + } + + public List<File> getProperitesFiles() { + return properitesFiles; + } + } + + public void startSqoopMiniCluster(String extraClasspath, String jobExtraClasspath) throws Exception { + // And use them for new Derby repo instance + setCluster(new DerbySqoopMiniCluster(HdfsUtils.joinPathFragments(super.getSqoopMiniClusterTemporaryPath(), getTestName()), hadoopCluster.getConfiguration(), extraClasspath, jobExtraClasspath)); + + // Start server + getCluster().start(); + + // Initialize Sqoop Client API + setClient(new SqoopClient(getServerUrl())); + } + + @BeforeMethod + public void captureClasspath() { + classLoader = Thread.currentThread().getContextClassLoader(); + } + + @AfterMethod + public void restoreClasspath(){ + Thread.currentThread().setContextClassLoader(classLoader); + } + + @Test + public void testClasspathSqoopProperties() throws Exception { + Map<String, String> jarMap = compileTestConnectorAndDependency(); + startSqoopMiniCluster(jarMap.get(TEST_CONNECTOR_JAR_NAME), jarMap.get + (TEST_DEPENDENCY_JAR_NAME)); + createAndLoadTableCities(); + + MJob job = prepareJob(); + + prepareDriverConfig(job); + + saveJob(job); + + executeJob(job); + + stopSqoop(); + deleteJars(jarMap); + } + + @Test + public void testClasspathDriverInput() throws Exception{ + Map<String, String> jarMap = compileTestConnectorAndDependency(); + startSqoopMiniCluster(jarMap.get(TEST_CONNECTOR_JAR_NAME), null); + createAndLoadTableCities(); + + MJob job = prepareJob(); + + MDriverConfig driverConfig = prepareDriverConfig(job); + + List<String> extraJars = new ArrayList<>(); + extraJars.add("file:" + jarMap.get(TEST_DEPENDENCY_JAR_NAME)); + driverConfig.getListInput("jarConfig.extraJars").setValue(extraJars); + + saveJob(job); + + executeJob(job); + + stopSqoop(); + deleteJars(jarMap); + } + + private MJob prepareJob() { + MLink rdbmsConnection = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsConnection); + saveLink(rdbmsConnection); + + MLink testConnection = getClient().createLink("test-connector"); + saveLink(testConnection); + + MJob job = getClient().createJob(rdbmsConnection.getPersistenceId(), testConnection.getPersistenceId()); + + fillRdbmsFromConfig(job, "id"); + + return job; + } + + private MDriverConfig prepareDriverConfig(MJob job) { + MDriverConfig driverConfig = job.getDriverConfig(); + driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3); + + return driverConfig; + } + + private Map<String, String> compileTestConnectorAndDependency() throws Exception { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + if (compiler == null) { + throw new IllegalStateException( + "Cannot find the system Java compiler. " + + "Check that your class path includes tools.jar"); + } + + Path outputDir = Files.createTempDirectory(null); + + Map<String, JarContents> sourceFileToJarMap = new HashMap<>(); + + ClassLoader classLoader = getClass().getClassLoader(); + List<File> sourceFiles = new ArrayList<>(); + File file = new File(classLoader.getResource("TestConnector/TestConnector.java").getFile()); + sourceFiles.add(file); + file = new File(classLoader.getResource("TestConnector/TestLinkConfiguration.java").getFile()); + sourceFiles.add(file); + file = new File(classLoader.getResource("TestConnector/TestLoader.java").getFile()); + sourceFiles.add(file); + file = new File(classLoader.getResource("TestConnector/TestToDestroyer.java").getFile()); + sourceFiles.add(file); + file = new File(classLoader.getResource("TestConnector/TestToInitializer.java").getFile()); + sourceFiles.add(file); + file = new File(classLoader.getResource("TestConnector/TestToJobConfiguration.java").getFile()); + sourceFiles.add(file); + + List<File> propertiesFiles = new ArrayList<>(); + file = new File(classLoader.getResource("TestConnector/sqoopconnector.properties").getFile()); + propertiesFiles.add(file); + sourceFileToJarMap.put("test-connector.jar", new JarContents(sourceFiles, propertiesFiles)); + + sourceFiles = new ArrayList<>(); + file = new File(classLoader.getResource("TestConnector/TestDependency.java").getFile()); + sourceFiles.add(file); + sourceFileToJarMap.put("test-dependency.jar", new JarContents(sourceFiles, ListUtils.EMPTY_LIST)); + + return buildJar(outputDir.toString(), sourceFileToJarMap); + } + + private Map<String, String> buildJar(String outputDir, Map<String, JarContents> sourceFileToJarMap) throws Exception { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + StandardJavaFileManager fileManager = compiler.getStandardFileManager + (null, null, null); + + List<File> sourceFiles = new ArrayList<>(); + for(JarContents jarContents : sourceFileToJarMap.values()) { + sourceFiles.addAll(jarContents.sourceFiles); + } + + fileManager.setLocation(StandardLocation.CLASS_OUTPUT, + Arrays.asList(new File(outputDir.toString()))); + + Iterable<? extends JavaFileObject> compilationUnits1 = + fileManager.getJavaFileObjectsFromFiles(sourceFiles); + + boolean compiled = compiler.getTask(null, fileManager, null, null, null, compilationUnits1).call(); + if (!compiled) { + throw new RuntimeException("failed to compile"); + } + + for(Map.Entry<String, JarContents> jarNameAndContents : sourceFileToJarMap.entrySet()) { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(Attributes.Name.CLASS_PATH, "."); + + + JarOutputStream target = new JarOutputStream(new FileOutputStream(outputDir.toString() + File.separator + jarNameAndContents.getKey()), manifest); + List<String> classesForJar = new ArrayList<>(); + for(File sourceFile : jarNameAndContents.getValue().getSourceFiles()) { + //split the file on dot to get the filename from FILENAME.java + String fileName = sourceFile.getName().split("\\.")[0]; + classesForJar.add(fileName); + } + + File dir = new File(outputDir); + File[] directoryListing = dir.listFiles(); + for (File compiledClass : directoryListing) { + String classFileName = compiledClass.getName().split("\\$")[0].split("\\.")[0]; + if (classesForJar.contains(classFileName)){ + addFileToJar(compiledClass, target); + } + } + + for (File propertiesFile : jarNameAndContents.getValue().getProperitesFiles()) { + addFileToJar(propertiesFile, target); + } + + target.close(); + + + } + //delete non jar files + File dir = new File(outputDir); + File[] directoryListing = dir.listFiles(); + for (File file : directoryListing) { + String extension = file.getName().split("\\.")[1]; + if (!extension.equals("jar")) { + file.delete(); + } + } + + Map<String, String> jarMap = new HashMap<>(); + jarMap.put(TEST_CONNECTOR_JAR_NAME, outputDir.toString() + File.separator + + TEST_CONNECTOR_JAR_NAME); + jarMap.put(TEST_DEPENDENCY_JAR_NAME, outputDir.toString() + File.separator + TEST_DEPENDENCY_JAR_NAME); + return jarMap; + } + + private void addFileToJar(File source, JarOutputStream target) throws Exception { + JarEntry entry = new JarEntry(source.getName()); + entry.setTime(source.lastModified()); + target.putNextEntry(entry); + BufferedInputStream in = new BufferedInputStream(new FileInputStream(source)); + + long bufferSize = source.length(); + if (bufferSize < Integer.MIN_VALUE || bufferSize > Integer.MAX_VALUE) { + throw new RuntimeException("file to large to be added to jar"); + } + + byte[] buffer = new byte[(int) bufferSize]; + while (true) { + int count = in.read(buffer); + if (count == -1) + break; + target.write(buffer, 0, count); + } + target.closeEntry(); + if (in != null) in.close(); + } + + private void deleteJars(Map<String, String> jarMap) throws Exception { + for (String jarPath : jarMap.values()) { + (new File(jarPath)).delete(); + } + } + + @Override + public void startSqoop() throws Exception { + // Do nothing so that Sqoop isn't started before Suite. + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestConnector.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestConnector.java b/test/src/test/resources/TestConnector/TestConnector.java new file mode 100644 index 0000000..d58a824 --- /dev/null +++ b/test/src/test/resources/TestConnector/TestConnector.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.commons.collections.ListUtils; +import org.apache.sqoop.common.Direction; +import org.apache.sqoop.connector.ftp.FtpLoader; +import org.apache.sqoop.connector.ftp.FtpToDestroyer; +import org.apache.sqoop.connector.ftp.FtpToInitializer; +import org.apache.sqoop.connector.jdbc.GenericJdbcConnectorConstants; +import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader; +import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.job.etl.From; +import org.apache.sqoop.job.etl.To; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.ResourceBundle; + +/** + * A connector that does not do anything + */ +public class TestConnector extends SqoopConnector { + + private static final To TO = new To(TestToInitializer.class, + TestLoader.class, + TestToDestroyer.class); + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public ResourceBundle getBundle(Locale locale) { + return ResourceBundle.getBundle( + GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale); + } + + @Override + public Class getLinkConfigurationClass() { + return TestLinkConfiguration.class; + } + + @Override + public Class getJobConfigurationClass(Direction direction) { + switch (direction) { + case TO: + return TestToJobConfiguration.class; + default: + return null; + } + } + + @Override + public From getFrom() { + return null; + } + + @Override + public To getTo() { + return TO; + } + + @Override + public ConnectorConfigurableUpgrader getConfigurableUpgrader() { + return null; + } + + @Override + public List<Direction> getSupportedDirections() { + return Arrays.asList(Direction.TO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestDependency.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestDependency.java b/test/src/test/resources/TestConnector/TestDependency.java new file mode 100644 index 0000000..25cbe6e --- /dev/null +++ b/test/src/test/resources/TestConnector/TestDependency.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestDependency { + public TestDependency() { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestLinkConfiguration.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestLinkConfiguration.java b/test/src/test/resources/TestConnector/TestLinkConfiguration.java new file mode 100644 index 0000000..f3194f8 --- /dev/null +++ b/test/src/test/resources/TestConnector/TestLinkConfiguration.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class TestLinkConfiguration { + public TestLinkConfiguration() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestLoader.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestLoader.java b/test/src/test/resources/TestConnector/TestLoader.java new file mode 100644 index 0000000..831c449 --- /dev/null +++ b/test/src/test/resources/TestConnector/TestLoader.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.job.etl.Loader; +import org.apache.sqoop.job.etl.LoaderContext; + +import java.util.UUID; + +public class TestLoader extends Loader<TestLinkConfiguration, TestToJobConfiguration> { + + private long rowsWritten = 0; + + + @Override + public void load(LoaderContext context, TestLinkConfiguration linkConfiguration, + TestToJobConfiguration toJobConfig) throws Exception { + DataReader reader = context.getDataReader(); + //This will break if the TestDependency jar is not loaded + TestDependency testDependency = new TestDependency(); + while (reader.readTextRecord() != null){ + rowsWritten++; + } + } + + @Override + public long getRowsWritten() { + return rowsWritten; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestToDestroyer.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestToDestroyer.java b/test/src/test/resources/TestConnector/TestToDestroyer.java new file mode 100644 index 0000000..c4eddb2 --- /dev/null +++ b/test/src/test/resources/TestConnector/TestToDestroyer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.sqoop.job.etl.Destroyer; +import org.apache.sqoop.job.etl.DestroyerContext; + +public class TestToDestroyer extends Destroyer<TestLinkConfiguration, TestToJobConfiguration> { + @Override + public void destroy(DestroyerContext context, TestLinkConfiguration linkConfig, + TestToJobConfiguration jobConfig) { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestToInitializer.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestToInitializer.java b/test/src/test/resources/TestConnector/TestToInitializer.java new file mode 100644 index 0000000..f62768d --- /dev/null +++ b/test/src/test/resources/TestConnector/TestToInitializer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.sqoop.job.etl.Initializer; +import org.apache.sqoop.job.etl.InitializerContext; + +public class TestToInitializer extends Initializer<TestLinkConfiguration, TestToJobConfiguration> { + @Override + public void initialize(InitializerContext context, TestLinkConfiguration linkConfig, + TestToJobConfiguration jobConfig) { + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/TestToJobConfiguration.java ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/TestToJobConfiguration.java b/test/src/test/resources/TestConnector/TestToJobConfiguration.java new file mode 100644 index 0000000..39db77f --- /dev/null +++ b/test/src/test/resources/TestConnector/TestToJobConfiguration.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.sqoop.model.ConfigurationClass; + +@ConfigurationClass +public class TestToJobConfiguration { + public TestToJobConfiguration() { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/TestConnector/sqoopconnector.properties ---------------------------------------------------------------------- diff --git a/test/src/test/resources/TestConnector/sqoopconnector.properties b/test/src/test/resources/TestConnector/sqoopconnector.properties new file mode 100644 index 0000000..21d56e7 --- /dev/null +++ b/test/src/test/resources/TestConnector/sqoopconnector.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test Connector Properties +org.apache.sqoop.connector.class = TestConnector +org.apache.sqoop.connector.name = test-connector http://git-wip-us.apache.org/repos/asf/sqoop/blob/d3fbe102/test/src/test/resources/classpath-tests-suite.xml ---------------------------------------------------------------------- diff --git a/test/src/test/resources/classpath-tests-suite.xml b/test/src/test/resources/classpath-tests-suite.xml new file mode 100644 index 0000000..e5a9e94 --- /dev/null +++ b/test/src/test/resources/classpath-tests-suite.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" > + +<suite name="ClasspathTests" verbose="2" parallel="false"> + + <listeners> + <listener class-name="org.apache.sqoop.test.testng.SqoopTestListener" /> + </listeners> + + <test name="ClasspathTests"> + <packages> + <package name="org.apache.sqoop.integration.classpath"/> + </packages> + </test> + +</suite>
