Repository: sqoop Updated Branches: refs/heads/sqoop2 b69fb9b68 -> 14eac41fa
SQOOP-2752: Sqoop2: Add impersonation support for kite hdfs (Dian Fu via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/14eac41f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/14eac41f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/14eac41f Branch: refs/heads/sqoop2 Commit: 14eac41fa39238df5677daa46880fb12de2f3c1d Parents: b69fb9b Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Dec 28 08:29:34 2015 +0100 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Dec 28 08:29:34 2015 +0100 ---------------------------------------------------------------------- .../sqoop/error/code/KiteConnectorError.java | 5 ++ connector/connector-kite/pom.xml | 5 ++ .../sqoop/connector/kite/KiteExtractor.java | 45 ++++++++---- .../connector/kite/KiteFromInitializer.java | 49 ++++++++++--- .../apache/sqoop/connector/kite/KiteLoader.java | 60 ++++++++++------ .../sqoop/connector/kite/KiteToDestroyer.java | 29 ++++++-- .../sqoop/connector/kite/KiteToInitializer.java | 33 +++++++-- .../apache/sqoop/connector/kite/KiteUtils.java | 73 ++++++++++++++++++++ .../kite/configuration/ConfigUtil.java | 15 ++++ .../kite/configuration/LinkConfig.java | 4 ++ .../resources/kite-connector-config.properties | 3 + .../sqoop/connector/kite/TestKiteExtractor.java | 3 +- .../connector/kite/TestKiteFromInitializer.java | 11 ++- .../sqoop/connector/kite/TestKiteLoader.java | 3 +- .../connector/kite/TestKiteToDestroyer.java | 7 +- .../connector/kite/TestKiteToInitializer.java | 10 ++- .../connector/kite/FromRDBMSToKiteTest.java | 1 + 17 files changed, 287 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java index 7db9904..170d6cb 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/KiteConnectorError.java @@ -33,6 +33,11 @@ public enum KiteConnectorError implements ErrorCode { /** Error occurred while creating partitions */ GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"), + /** Error occurred while adding configuration directory to classpath */ + GENERIC_KITE_CONNECTOR_0004("Error occurred while adding configuration directory to classpath"), + + GENERIC_KITE_CONNECTOR_0005("Invalid kite dataset uri"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/pom.xml ---------------------------------------------------------------------- diff --git a/connector/connector-kite/pom.xml b/connector/connector-kite/pom.xml index 0792445..a492c5b 100644 --- a/connector/connector-kite/pom.xml +++ b/connector/connector-kite/pom.xml @@ -39,6 +39,11 @@ limitations under the License. <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.sqoop</groupId> + <artifactId>connector-sdk-hadoop</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java index d93f9b5..e5b2e65 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java @@ -17,12 +17,18 @@ */ package org.apache.sqoop.connector.kite; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + import com.google.common.annotations.VisibleForTesting; import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.error.code.KiteConnectorError; import org.apache.sqoop.etl.io.DataWriter; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; @@ -47,25 +53,34 @@ public class KiteExtractor extends Extractor<LinkConfiguration, } @Override - public void extract(ExtractorContext context, LinkConfiguration linkConfig, - FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) { - String uri = ConfigUtil.buildDatasetUri( + public void extract(final ExtractorContext context, final LinkConfiguration linkConfig, + final FromJobConfiguration fromJobConfig, final KiteDatasetPartition partition) { + final String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, partition.getUri()); LOG.info("Loading data from " + uri); - KiteDatasetExecutor executor = getExecutor(uri); - DataWriter writer = context.getDataWriter(); - Object[] array; - rowsRead = 0L; - try { - while ((array = executor.readRecord()) != null) { - // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime). - writer.writeArrayRecord(array); - rowsRead++; - } - } finally { - executor.closeReader(); + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + KiteDatasetExecutor executor = getExecutor(uri); + DataWriter writer = context.getDataWriter(); + Object[] array; + rowsRead = 0L; + + try { + while ((array = executor.readRecord()) != null) { + // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime). + writer.writeArrayRecord(array); + rowsRead++; + } + } finally { + executor.closeReader(); + } + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java index 4502d59..002286b 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java @@ -17,8 +17,11 @@ */ package org.apache.sqoop.connector.kite; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; @@ -31,6 +34,8 @@ import org.apache.sqoop.utils.ClassUtils; import org.kitesdk.data.Dataset; import org.kitesdk.data.Datasets; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Set; /** @@ -42,16 +47,33 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration, private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class); @Override - public void initialize(InitializerContext context, - LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { - String uri = ConfigUtil.buildDatasetUri( + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public void initialize(final InitializerContext context, + final LinkConfiguration linkConfig, final FromJobConfiguration fromJobConfig) { + final String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri); LOG.debug("Constructed dataset URI: " + uri); - if (!Datasets.exists(uri)) { - LOG.error("Dataset does not exist"); - throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002); + KiteUtils.addConfigDirToClasspath(linkConfig); + try { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + if (!Datasets.exists(uri)) { + LOG.error("Dataset does not exist"); + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002); + } + + if (ConfigUtil.isHdfsJob(fromJobConfig.fromJobConfig)) { + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration()); + } + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); } } + @Override public Set<String> getJars(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { @@ -72,12 +94,23 @@ public class KiteFromInitializer extends Initializer<LinkConfiguration, return jars; } + @SuppressWarnings("rawtypes") @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) { - String uri = ConfigUtil.buildDatasetUri( + final String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, fromJobConfig.fromJobConfig.uri); - Dataset dataset = Datasets.load(uri); + Dataset dataset = null; + try { + dataset = SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Dataset>() { + public Dataset run() { + return Datasets.load(uri); + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); + } org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema(); return AvroDataTypeUtil.createSqoopSchema(avroSchema); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java index ca0a5c7..ff9aa33 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java @@ -17,13 +17,19 @@ */ package org.apache.sqoop.connector.kite; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + import com.google.common.annotations.VisibleForTesting; import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.error.code.KiteConnectorError; import org.apache.sqoop.etl.io.DataReader; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; @@ -55,32 +61,42 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> { } @Override - public void load(LoaderContext context, LinkConfiguration linkConfiguration, - ToJobConfiguration toJobConfig) throws Exception { - String uri = ConfigUtil.buildDatasetUri( + public void load(final LoaderContext context, final LinkConfiguration linkConfiguration, + final ToJobConfiguration toJobConfig) throws Exception { + final String uri = ConfigUtil.buildDatasetUri( linkConfiguration.linkConfig, toJobConfig.toJobConfig); - KiteDatasetExecutor executor = getExecutor( - linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat); - LOG.info("Temporary dataset created."); - - DataReader reader = context.getDataReader(); - Object[] array; - boolean success = false; try { - while ((array = reader.readArrayRecord()) != null) { - executor.writeRecord(array); - rowsWritten++; - } - LOG.info(rowsWritten + " data record(s) have been written into dataset."); - success = true; - } finally { - executor.closeWriter(); + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + KiteDatasetExecutor executor = getExecutor( + linkConfiguration, uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat); + LOG.info("Temporary dataset created."); + + DataReader reader = context.getDataReader(); + Object[] array; + boolean success = false; + + try { + while ((array = reader.readArrayRecord()) != null) { + executor.writeRecord(array); + rowsWritten++; + } + LOG.info(rowsWritten + " data record(s) have been written into dataset."); + success = true; + } finally { + executor.closeWriter(); - if (!success) { - LOG.error("Fail to write data, dataset will be removed."); - executor.deleteDataset(); - } + if (!success) { + LOG.error("Fail to write data, dataset will be removed."); + executor.deleteDataset(); + } + } + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java index fb83f2b..0ac1836 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java @@ -17,13 +17,19 @@ */ package org.apache.sqoop.connector.kite; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + import com.google.common.annotations.VisibleForTesting; import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.common.FileFormat; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.error.code.KiteConnectorError; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; import org.apache.sqoop.schema.Schema; @@ -42,16 +48,25 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration, private static final Logger LOG = Logger.getLogger(KiteToDestroyer.class); @Override - public void destroy(DestroyerContext context, - LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { + public void destroy(final DestroyerContext context, + final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) { LOG.info("Running Kite connector destroyer"); - String uri = ConfigUtil.buildDatasetUri( + final String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, toJobConfig.toJobConfig); - if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) { - destroyHBaseJob(context, uri, toJobConfig); - } else { - destroyHdfsJob(context, uri, toJobConfig); + try { + SecurityUtils.createProxyUserAndLoadDelegationTokens(context).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + if (ConfigUtil.isHBaseJob(toJobConfig.toJobConfig)) { + destroyHBaseJob(context, uri, toJobConfig); + } else { + destroyHdfsJob(context, uri, toJobConfig); + } + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java index effab19..4fc2687 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java @@ -17,8 +17,11 @@ */ package org.apache.sqoop.connector.kite; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.hadoop.security.SecurityUtils; import org.apache.sqoop.connector.kite.configuration.ConfigUtil; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; @@ -30,6 +33,8 @@ import org.apache.sqoop.schema.Schema; import org.apache.sqoop.utils.ClassUtils; import org.kitesdk.data.Datasets; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Set; /** @@ -43,14 +48,30 @@ public class KiteToInitializer extends Initializer<LinkConfiguration, private static final Logger LOG = Logger.getLogger(KiteToInitializer.class); @Override - public void initialize(InitializerContext context, - LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) { - String uri = ConfigUtil.buildDatasetUri( + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public void initialize(final InitializerContext context, + final LinkConfiguration linkConfig, final ToJobConfiguration toJobConfig) { + final String uri = ConfigUtil.buildDatasetUri( linkConfig.linkConfig, toJobConfig.toJobConfig); LOG.debug("Constructed dataset URI: " + uri); - if (Datasets.exists(uri)) { - LOG.error("Overwrite an existing dataset is not expected in new create mode."); - throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); + KiteUtils.addConfigDirToClasspath(linkConfig); + try { + SecurityUtils.createProxyUser(context).doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws Exception { + if (Datasets.exists(uri)) { + LOG.error("Overwrite an existing dataset is not expected in new create mode."); + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001); + } + + if (ConfigUtil.isHdfsJob(toJobConfig.toJobConfig)) { + // Generate delegation tokens if we are on secured cluster + SecurityUtils.generateDelegationTokens(context.getContext(), new Path(ConfigUtil.removeDatasetPrefix(uri)), new Configuration()); + } + return null; + } + }); + } catch (IOException | InterruptedException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0005, "Unexpected exception", e); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java new file mode 100644 index 0000000..fc6177d --- /dev/null +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteUtils.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.connector.kite; + +import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.error.code.KiteConnectorError; + +/** + * Utilities for Kite. + */ +public class KiteUtils { + + private static final Logger LOG = Logger.getLogger(KiteUtils.class); + + private static final String DEFAULT_HADOOP_CONF_DIR = "/etc/hadoop/conf"; + + @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"}) + public static void addConfigDirToClasspath(final LinkConfiguration linkConfig) { + File configDir = new File(getConfDir(linkConfig)); + try { + final Method method = URLClassLoader.class.getDeclaredMethod("addURL", new Class[]{URL.class}); + AccessController.doPrivileged(new PrivilegedAction<Void>() { + @Override + public Void run() { + method.setAccessible(true); + return null; + } + }); + method.invoke(ClassLoader.getSystemClassLoader(), new Object[]{configDir.toURI().toURL()}); + } catch (NoSuchMethodException | SecurityException + | InvocationTargetException | IllegalAccessException + | IllegalArgumentException | MalformedURLException e) { + throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0004, e); + } + LOG.debug("Added file " + configDir + " to classpath"); + } + + private static String getConfDir(LinkConfiguration linkConfig) { + String confDir = linkConfig.linkConfig.confDir; + if (StringUtils.isBlank(confDir)) { + confDir = DEFAULT_HADOOP_CONF_DIR; + } + return confDir; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java index e63bccf..3393c6e 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java @@ -68,4 +68,19 @@ public class ConfigUtil { return toJobConfig.uri.startsWith("dataset:hbase:"); } + public static boolean isHdfsJob(ToJobConfig toJobConfig) { + return toJobConfig.uri.startsWith("dataset:hdfs:"); + } + + public static boolean isHdfsJob(FromJobConfig fromJobConfig) { + return fromJobConfig.uri.startsWith("dataset:hdfs:"); + } + + public static String removeDatasetPrefix(String uri) { + if (uri.startsWith("dataset:")) { + return uri.substring("dataset:".length()); + } else { + return uri; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java index ee31f15..508d63e 100644 --- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java +++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java @@ -23,6 +23,7 @@ import org.apache.sqoop.model.Input; import org.apache.sqoop.model.Validator; import org.apache.sqoop.validation.Status; import org.apache.sqoop.validation.validators.AbstractValidator; +import org.apache.sqoop.validation.validators.DirectoryExistsValidator; import org.apache.sqoop.validation.validators.HostAndPortValidator; @ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)}) @@ -31,6 +32,9 @@ public class LinkConfig { @Input(size = 255) public String authority; + @Input(size = 255, validators = { @Validator(DirectoryExistsValidator.class)}) + public String confDir; + public static class ConfigValidator extends AbstractValidator<LinkConfig> { @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/main/resources/kite-connector-config.properties ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties index c134ac3..f384f3d 100644 --- a/connector/connector-kite/src/main/resources/kite-connector-config.properties +++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties @@ -25,6 +25,9 @@ linkConfig.help = You must supply the information requested in order to create a linkConfig.authority.label = HDFS host and port linkConfig.authority.help = Optional to override HDFS file system location. +linkConfig.confDir.label = Hadoop conf directory +linkConfig.confDir.help = Directory with Hadoop configuration files. This directory will be added to the classpath. + # To Job Config # toJobConfig.label = To Kite Dataset Configuration http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java index c49be92..4da9218 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.kite; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.etl.io.DataWriter; @@ -73,7 +74,7 @@ public class TestKiteExtractor { // setup Schema schema = new Schema("testExtractor"); schema.addColumn(new Text("TextCol")); - ExtractorContext context = new ExtractorContext(null, writerMock, schema, "test_user"); + ExtractorContext context = new ExtractorContext(new MutableMapContext(), writerMock, schema, "test_user"); LinkConfiguration linkConfig = new LinkConfiguration(); FromJobConfiguration jobConfig = new FromJobConfiguration(); KiteDatasetPartition partition = new KiteDatasetPartition(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java index 6df5d83..5e301bf 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java @@ -18,9 +18,12 @@ package org.apache.sqoop.connector.kite; +import org.apache.sqoop.common.MutableContext; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; +import org.apache.sqoop.job.etl.InitializerContext; import org.kitesdk.data.Datasets; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -33,10 +36,11 @@ import static org.mockito.MockitoAnnotations.initMocks; import static org.powermock.api.mockito.PowerMockito.mockStatic; @PrepareForTest(Datasets.class) -@PowerMockIgnore("org.apache.sqoop.common.ErrorCode") +@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"}) public class TestKiteFromInitializer extends PowerMockTestCase { private KiteFromInitializer initializer; + private InitializerContext initializerContext; @BeforeMethod(alwaysRun = true) public void setUp() { @@ -44,6 +48,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase { mockStatic(Datasets.class); initializer = new KiteFromInitializer(); + initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); } @Test @@ -54,7 +59,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase { when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true); // exercise - initializer.initialize(null, new LinkConfiguration(), jobConfig); + initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig); } @Test(expectedExceptions = SqoopException.class) @@ -65,7 +70,7 @@ public class TestKiteFromInitializer extends PowerMockTestCase { when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false); // exercise - initializer.initialize(null, new LinkConfiguration(), jobConfig); + initializer.initialize(initializerContext, new LinkConfiguration(), jobConfig); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java index c5aa1bd..c4ab867 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.kite; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; @@ -81,7 +82,7 @@ public class TestKiteLoader { return null; } }; - LoaderContext context = new LoaderContext(null, reader, schema, "test_user"); + LoaderContext context = new LoaderContext(new MutableMapContext(), reader, schema, "test_user"); LinkConfiguration linkConfig = new LinkConfiguration(); ToJobConfiguration toJobConfig = new ToJobConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java index 00b8871..cf26986 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java @@ -18,6 +18,7 @@ package org.apache.sqoop.connector.kite; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.connector.common.FileFormat; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; @@ -38,7 +39,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.verifyStatic; @PrepareForTest({KiteDatasetExecutor.class, Datasets.class}) -@PowerMockIgnore("org.apache.sqoop.common.ErrorCode") +@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"}) public class TestKiteToDestroyer extends PowerMockTestCase { private KiteToDestroyer destroyer; @@ -78,7 +79,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase { @Test public void testDestroyForSuccessfulJob() { // setup - DestroyerContext context = new DestroyerContext(null, true, null, user); + DestroyerContext context = new DestroyerContext(new MutableMapContext(), true, null, user); when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) .thenReturn(expectedUris); @@ -94,7 +95,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase { @Test public void testDestroyForFailedJob() { // setup - DestroyerContext context = new DestroyerContext(null, false, null, user); + DestroyerContext context = new DestroyerContext(new MutableMapContext(), false, null, user); when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri)) .thenReturn(expectedUris); for (String uri : expectedUris) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java index 5230ffe..e5bb667 100644 --- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java +++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java @@ -18,9 +18,11 @@ package org.apache.sqoop.connector.kite; +import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.kite.configuration.LinkConfiguration; import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration; +import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.schema.Schema; import org.kitesdk.data.Datasets; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -36,10 +38,11 @@ import static org.mockito.MockitoAnnotations.initMocks; import static org.powermock.api.mockito.PowerMockito.mockStatic; @PrepareForTest(Datasets.class) -@PowerMockIgnore("org.apache.sqoop.common.ErrorCode") +@PowerMockIgnore({"org.apache.sqoop.common.ErrorCode", "com.sun.security.auth.UnixPrincipal"}) public class TestKiteToInitializer extends PowerMockTestCase { private KiteToInitializer initializer; + private InitializerContext initializerContext; @BeforeMethod(alwaysRun = true) public void setUp() { @@ -47,6 +50,7 @@ public class TestKiteToInitializer extends PowerMockTestCase { mockStatic(Datasets.class); initializer = new KiteToInitializer(); + initializerContext = new InitializerContext(new MutableMapContext(), "test_user"); } @Test @@ -59,7 +63,7 @@ public class TestKiteToInitializer extends PowerMockTestCase { .thenReturn(false); // exercise - initializer.initialize(null, linkConfig, toJobConfig); + initializer.initialize(initializerContext, linkConfig, toJobConfig); } @Test(expectedExceptions = SqoopException.class) @@ -72,7 +76,7 @@ public class TestKiteToInitializer extends PowerMockTestCase { .thenReturn(true); // exercise - initializer.initialize(null, linkConfig, toJobConfig); + initializer.initialize(initializerContext, linkConfig, toJobConfig); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/14eac41f/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java index 7b2aced..be9fef1 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/kite/FromRDBMSToKiteTest.java @@ -64,6 +64,7 @@ public class FromRDBMSToKiteTest extends ConnectorTestCase { // Kite link MLink kiteLink = getClient().createLink("kite-connector"); kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.authority").setValue(hdfsClient.getUri().getAuthority()); + kiteLink.getConnectorLinkConfig().getStringInput("linkConfig.confDir").setValue(getCluster().getConfigurationPath()); saveLink(kiteLink); // Job creation
