http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java index 43e6463..27afd8c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java @@ -41,154 +41,154 @@ import org.apache.sqoop.job.PrefixContext; * Extract from HDFS. * Default field delimiter of a record is comma. */ -public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { - - public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class); - - private Configuration conf; - private DataWriter dataWriter; - private long rowRead = 0; - - @Override - public void extract(ExtractorContext context, - ConnectionConfiguration connectionConfiguration, - ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { - - conf = ((PrefixContext) context.getContext()).getConfiguration(); - dataWriter = context.getDataWriter(); - - try { - HdfsExportPartition p = partition; - LOG.info("Working on partition: " + p); - int numFiles = p.getNumberOfFiles(); - for (int i = 0; i < numFiles; i++) { - extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); - } - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); - } - } - - private void extractFile(Path file, long start, long length) - throws IOException { - long end = start + length; - LOG.info("Extracting file " + file); - LOG.info("\t from offset " + start); - LOG.info("\t to offset " + end); - LOG.info("\t of length " + length); - if(isSequenceFile(file)) { - extractSequenceFile(file, start, length); - } else { - extractTextFile(file, start, length); - } - } - - /** - * Extracts Sequence file - * @param file - * @param start - * @param length - * @throws IOException - */ - private void extractSequenceFile(Path file, long start, long length) - throws IOException { - LOG.info("Extracting sequence file"); - long end = start + length; - SequenceFile.Reader filereader = new SequenceFile.Reader( - file.getFileSystem(conf), file, conf); - - if (start > filereader.getPosition()) { - filereader.sync(start); // sync to start - } - - Text line = new Text(); - boolean hasNext = filereader.next(line); - while (hasNext) { - rowRead++; - dataWriter.writeStringRecord(line.toString()); - line = new Text(); - hasNext = filereader.next(line); - if (filereader.getPosition() >= end && filereader.syncSeen()) { - break; - } - } - filereader.close(); - } - - /** - * Extracts Text file - * @param file - * @param start - * @param length - * @throws IOException - */ - private void extractTextFile(Path file, long start, long length) - throws IOException { - LOG.info("Extracting text file"); - long end = start + length; - FileSystem fs = file.getFileSystem(conf); - FSDataInputStream filestream = fs.open(file); - CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); - LineReader filereader; - Seekable fileseeker = filestream; - - // Hadoop 1.0 does not have support for custom record delimiter and thus - // we - // are supporting only default one. - // We might add another "else if" case for SplittableCompressionCodec once - // we drop support for Hadoop 1.0. - if (codec == null) { - filestream.seek(start); - filereader = new LineReader(filestream); - } else { - filereader = new LineReader(codec.createInputStream(filestream, - codec.createDecompressor()), conf); - fileseeker = filestream; - } - if (start != 0) { - // always throw away first record because - // one extra line is read in previous split - start += filereader.readLine(new Text(), 0); - } - int size; - LOG.info("Start position: " + String.valueOf(start)); - long next = start; - while (next <= end) { - Text line = new Text(); - size = filereader.readLine(line, Integer.MAX_VALUE); - if (size == 0) { - break; - } - if (codec == null) { - next += size; - } else { - next = fileseeker.getPos(); - } - rowRead++; - dataWriter.writeStringRecord(line.toString()); - } - LOG.info("Extracting ended on position: " + fileseeker.getPos()); - filestream.close(); - } - - @Override - public long getRowsRead() { - return rowRead; - } - - /** - * Returns true if given file is sequence - * @param file - * @return boolean - */ - private boolean isSequenceFile(Path file) { - SequenceFile.Reader filereader = null; - try { - filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); - filereader.close(); - } catch (IOException e) { - return false; - } - return true; - } -} +//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { +// +// public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class); +// +// private Configuration conf; +// private DataWriter dataWriter; +// private long rowRead = 0; +// +// @Override +// public void extract(ExtractorContext context, +// ConnectionConfiguration connectionConfiguration, +// ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { +// +// conf = ((PrefixContext) context.getContext()).getConfiguration(); +// dataWriter = context.getDataWriter(); +// +// try { +// HdfsExportPartition p = partition; +// LOG.info("Working on partition: " + p); +// int numFiles = p.getNumberOfFiles(); +// for (int i = 0; i < numFiles; i++) { +// extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); +// } +// } catch (IOException e) { +// throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); +// } +// } +// +// private void extractFile(Path file, long start, long length) +// throws IOException { +// long end = start + length; +// LOG.info("Extracting file " + file); +// LOG.info("\t from offset " + start); +// LOG.info("\t to offset " + end); +// LOG.info("\t of length " + length); +// if(isSequenceFile(file)) { +// extractSequenceFile(file, start, length); +// } else { +// extractTextFile(file, start, length); +// } +// } +// +// /** +// * Extracts Sequence file +// * @param file +// * @param start +// * @param length +// * @throws IOException +// */ +// private void extractSequenceFile(Path file, long start, long length) +// throws IOException { +// LOG.info("Extracting sequence file"); +// long end = start + length; +// SequenceFile.Reader filereader = new SequenceFile.Reader( +// file.getFileSystem(conf), file, conf); +// +// if (start > filereader.getPosition()) { +// filereader.sync(start); // sync to start +// } +// +// Text line = new Text(); +// boolean hasNext = filereader.next(line); +// while (hasNext) { +// rowRead++; +// dataWriter.writeStringRecord(line.toString()); +// line = new Text(); +// hasNext = filereader.next(line); +// if (filereader.getPosition() >= end && filereader.syncSeen()) { +// break; +// } +// } +// filereader.close(); +// } +// +// /** +// * Extracts Text file +// * @param file +// * @param start +// * @param length +// * @throws IOException +// */ +// private void extractTextFile(Path file, long start, long length) +// throws IOException { +// LOG.info("Extracting text file"); +// long end = start + length; +// FileSystem fs = file.getFileSystem(conf); +// FSDataInputStream filestream = fs.open(file); +// CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); +// LineReader filereader; +// Seekable fileseeker = filestream; +// +// // Hadoop 1.0 does not have support for custom record delimiter and thus +// // we +// // are supporting only default one. +// // We might add another "else if" case for SplittableCompressionCodec once +// // we drop support for Hadoop 1.0. +// if (codec == null) { +// filestream.seek(start); +// filereader = new LineReader(filestream); +// } else { +// filereader = new LineReader(codec.createInputStream(filestream, +// codec.createDecompressor()), conf); +// fileseeker = filestream; +// } +// if (start != 0) { +// // always throw away first record because +// // one extra line is read in previous split +// start += filereader.readLine(new Text(), 0); +// } +// int size; +// LOG.info("Start position: " + String.valueOf(start)); +// long next = start; +// while (next <= end) { +// Text line = new Text(); +// size = filereader.readLine(line, Integer.MAX_VALUE); +// if (size == 0) { +// break; +// } +// if (codec == null) { +// next += size; +// } else { +// next = fileseeker.getPos(); +// } +// rowRead++; +// dataWriter.writeStringRecord(line.toString()); +// } +// LOG.info("Extracting ended on position: " + fileseeker.getPos()); +// filestream.close(); +// } +// +// @Override +// public long getRowsRead() { +// return rowRead; +// } +// +// /** +// * Returns true if given file is sequence +// * @param file +// * @return boolean +// */ +// private boolean isSequenceFile(Path file) { +// SequenceFile.Reader filereader = null; +// try { +// filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); +// filereader.close(); +// } catch (IOException e) { +// return false; +// } +// return true; +// } +//}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java index bd11323..c60ae68 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java @@ -22,10 +22,10 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.PropertyConfigurator; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.json.util.SchemaSerialization; import org.apache.sqoop.model.FormUtils; -import org.apache.sqoop.model.MJob; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.utils.ClassUtils; import org.json.simple.JSONObject; @@ -40,59 +40,59 @@ import java.util.Properties; */ public final class ConfigurationUtils { - private static final String JOB_TYPE = JobConstants.PREFIX_JOB_CONFIG + "type"; + private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.connection"; - private static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.connection"; + private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.connection"; - private static final String JOB_CONFIG_CLASS_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.job"; + private static final String JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job"; - private static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.connection"; + private static final String JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job"; + + private static final String JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.from.connection"; + + private static final String JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.to.connection"; private static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.job"; - private static final String JOB_CONFIG_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.connection"; + private static final String JOB_CONFIG_FROM_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.connection"; + + private static final Text JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_CONNECTION); + + private static final String JOB_CONFIG_TO_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.connection"; + + private static final Text JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_CONNECTION); + + private static final String JOB_CONFIG_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job"; + + private static final Text JOB_CONFIG_FROM_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_FROM_CONNECTOR_JOB); - private static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_CONNECTOR_CONNECTION); + private static final String JOB_CONFIG_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job"; - private static final String JOB_CONFIG_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.job"; + private static final Text JOB_CONFIG_TO_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_TO_CONNECTOR_JOB); - private static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_CONNECTOR_JOB); + private static final String JOB_CONFIG_FROM_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection"; - private static final String JOB_CONFIG_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.connection"; + private static final Text JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION); - private static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FRAMEWORK_CONNECTION); + private static final String JOB_CONFIG_TO_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.from.connection"; + + private static final Text JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_TO_FRAMEWORK_CONNECTION); private static final String JOB_CONFIG_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.framework.job"; private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB); - private static final String SCHEMA_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector"; + private static final String SCHEMA_FROM_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from"; - private static final Text SCHEMA_CONNECTOR_KEY = new Text(SCHEMA_CONNECTOR); + private static final Text SCHEMA_FROM_CONNECTOR_KEY = new Text(SCHEMA_FROM_CONNECTOR); - private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio"; + private static final String SCHEMA_TO_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to"; - private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO); + private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR); - /** - * Persist job type in the configuration object. - * - * @param configuration MapReduce configuration object - * @param type Job type - */ - public static void setJobType(Configuration configuration, MJob.Type type) { - configuration.set(JOB_TYPE, type.name()); - } + private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio"; - /** - * Retrieve job type. - * - * @param configuration MapReduce configuration object - * @return Job type - */ - public static MJob.Type getJobType(Configuration configuration) { - return MJob.Type.valueOf(configuration.get(JOB_TYPE)); - } + private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO); /** * Persist Connector configuration object for connection. @@ -100,20 +100,38 @@ public final class ConfigurationUtils { * @param job MapReduce job object * @param obj Configuration object */ - public static void setConfigConnectorConnection(Job job, Object obj) { - job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, obj.getClass().getName()); - job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + public static void setConnectorConnectionConfig(ConnectorType type, Job job, Object obj) { + switch (type) { + case FROM: + job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + break; + + case TO: + job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + break; + } } /** - * Persist Connector configuration object for job. + * Persist Connector configuration objects for job. * * @param job MapReduce job object * @param obj Configuration object */ - public static void setConfigConnectorJob(Job job, Object obj) { - job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_JOB, obj.getClass().getName()); - job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes()); + public static void setConnectorJobConfig(ConnectorType type, Job job, Object obj) { + switch (type) { + case FROM: + job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_FROM_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes()); + break; + + case TO: + job.getConfiguration().set(JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_TO_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes()); + break; + } } /** @@ -122,9 +140,18 @@ public final class ConfigurationUtils { * @param job MapReduce job object * @param obj Configuration object */ - public static void setConfigFrameworkConnection(Job job, Object obj) { - job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, obj.getClass().getName()); - job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + public static void setFrameworkConnectionConfig(ConnectorType type, Job job, Object obj) { + switch (type) { + case FROM: + job.getConfiguration().set(JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + break; + + case TO: + job.getConfiguration().set(JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, obj.getClass().getName()); + job.getCredentials().addSecretKey(JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes()); + break; + } } /** @@ -144,8 +171,16 @@ public final class ConfigurationUtils { * @param configuration MapReduce configuration object * @return Configuration object */ - public static Object getConfigConnectorConnection(Configuration configuration) { - return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, JOB_CONFIG_CONNECTOR_CONNECTION_KEY); + public static Object getConnectorConnectionConfig(ConnectorType type, Configuration configuration) { + switch (type) { + case FROM: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_CONNECTION, JOB_CONFIG_FROM_CONNECTOR_CONNECTION_KEY); + + case TO: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_CONNECTION, JOB_CONFIG_TO_CONNECTOR_CONNECTION_KEY); + } + + return null; } /** @@ -154,8 +189,16 @@ public final class ConfigurationUtils { * @param configuration MapReduce configuration object * @return Configuration object */ - public static Object getConfigConnectorJob(Configuration configuration) { - return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_JOB, JOB_CONFIG_CONNECTOR_JOB_KEY); + public static Object getConnectorJobConfig(ConnectorType type, Configuration configuration) { + switch (type) { + case FROM: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, JOB_CONFIG_FROM_CONNECTOR_JOB_KEY); + + case TO: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, JOB_CONFIG_TO_CONNECTOR_JOB_KEY); + } + + return null; } /** @@ -164,8 +207,16 @@ public final class ConfigurationUtils { * @param configuration MapReduce configuration object * @return Configuration object */ - public static Object getConfigFrameworkConnection(Configuration configuration) { - return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, JOB_CONFIG_FRAMEWORK_CONNECTION_KEY); + public static Object getFrameworkConnectionConfig(ConnectorType type, Configuration configuration) { + switch (type) { + case FROM: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FROM_FRAMEWORK_CONNECTION, JOB_CONFIG_FROM_FRAMEWORK_CONNECTION_KEY); + + case TO: + return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_TO_FRAMEWORK_CONNECTION, JOB_CONFIG_TO_FRAMEWORK_CONNECTION_KEY); + } + + return null; } /** @@ -179,47 +230,57 @@ public final class ConfigurationUtils { } /** - * Persist Connector generated schema. + * Persist From Connector generated schema. * * @param job MapReduce Job object * @param schema Schema */ - public static void setConnectorSchema(Job job, Schema schema) { + public static void setFromConnectorSchema(Job job, Schema schema) { if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + job.getCredentials().addSecretKey(SCHEMA_FROM_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); } } /** - * Persist Framework generated schema. + * Persist To Connector generated schema. * * @param job MapReduce Job object * @param schema Schema */ - public static void setHioSchema(Job job, Schema schema) { + public static void setToConnectorSchema(Job job, Schema schema) { if(schema != null) { - job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + job.getCredentials().addSecretKey(SCHEMA_TO_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); } } /** - * Retrieve Connector generated schema. + * Persist Framework generated schema. * - * @param configuration MapReduce configuration object - * @return Schema + * @param job MapReduce Job object + * @param schema Schema */ - public static Schema getConnectorSchema(Configuration configuration) { - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_CONNECTOR_KEY)); + public static void setHioSchema(Job job, Schema schema) { + if(schema != null) { + job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes()); + } } /** - * Retrieve Framework generated schema. + * Retrieve From Connector generated schema. * * @param configuration MapReduce configuration object * @return Schema */ - public static Schema getHioSchema(Configuration configuration) { - return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_HIO_KEY)); + public static Schema getConnectorSchema(ConnectorType type, Configuration configuration) { + switch (type) { + case FROM: + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_CONNECTOR_KEY)); + + case TO: + return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_CONNECTOR_KEY)); + } + + return null; } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index e1a95a7..b4e9c2b 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -19,10 +19,12 @@ package org.apache.sqoop.job.mr; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Destroyer; import org.apache.sqoop.job.etl.DestroyerContext; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.utils.ClassUtils; @@ -51,18 +53,18 @@ public class SqoopDestroyerExecutor { } // Objects that should be pass to the Destroyer execution - PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT); - Object configConnection = ConfigurationUtils.getConfigConnectorConnection(configuration); - Object configJob = ConfigurationUtils.getConfigConnectorJob(configuration); + PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, configuration); + Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, configuration); // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between HIO and Connector schema - Schema schema = ConfigurationUtils.getConnectorSchema(configuration); + // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, configuration); DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema); LOG.info("Executing destroyer class " + destroyer.getClass()); - destroyer.destroy(destroyerContext, configConnection, configJob); + destroyer.destroy(destroyerContext, fromConfigConnection, fromConfigJob); } private SqoopDestroyerExecutor() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index 6891258..4bd7bce 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; @@ -36,6 +37,7 @@ import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; import org.apache.sqoop.job.etl.PartitionerContext; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.utils.ClassUtils; @@ -61,10 +63,10 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); - PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - Object connectorConnection = ConfigurationUtils.getConfigConnectorConnection(conf); - Object connectorJob = ConfigurationUtils.getConfigConnectorJob(conf); - Schema schema = ConfigurationUtils.getConnectorSchema(conf); + PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf); + Object connectorJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf); + Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index 645dbc6..2daaee3 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.IntermediateDataFormat; import org.apache.sqoop.job.JobConstants; @@ -34,6 +35,7 @@ import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.etl.ExtractorContext; import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.submission.counter.SqoopCounters; @@ -75,24 +77,13 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, Object configJob = null; // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between HIO and Connector schema - Schema schema = ConfigurationUtils.getConnectorSchema(conf); - - // Executor is in connector space for IMPORT and in framework space for EXPORT - switch (ConfigurationUtils.getJobType(conf)) { - case IMPORT: - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - configConnection = ConfigurationUtils.getConfigConnectorConnection(conf); - configJob = ConfigurationUtils.getConfigConnectorJob(conf); - break; - case EXPORT: - subContext = new PrefixContext(conf, ""); - configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf); - configJob = ConfigurationUtils.getConfigFrameworkJob(conf); - break; - default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); - } + // TODO: Change to coditional choosing between Connector schemas. + Schema schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); + + // Get configs for extractor + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.FROM, conf); + configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.FROM, conf); SqoopSplit split = context.getCurrentKey(); ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 6efadf6..123737e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; import org.apache.sqoop.connector.idf.IntermediateDataFormat; @@ -39,6 +40,7 @@ import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.etl.io.DataReader; +import org.apache.sqoop.model.MConnector; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.job.io.SqoopWritable; import org.apache.sqoop.utils.ClassUtils; @@ -225,23 +227,13 @@ public class SqoopOutputFormatLoadExecutor { if (!isTest) { // Propagate connector schema in every case for now - // TODO: Change to coditional choosing between HIO and Connector schema - schema = ConfigurationUtils.getConnectorSchema(conf); + // TODO: Change to coditional choosing between Connector schemas. + // @TODO(Abe): Maybe use TO schema? + schema = ConfigurationUtils.getConnectorSchema(ConnectorType.FROM, conf); - switch (ConfigurationUtils.getJobType(conf)) { - case EXPORT: - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - configConnection = ConfigurationUtils.getConfigConnectorConnection(conf); - configJob = ConfigurationUtils.getConfigConnectorJob(conf); - break; - case IMPORT: - subContext = new PrefixContext(conf, ""); - configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf); - configJob = ConfigurationUtils.getConfigFrameworkJob(conf); - break; - default: - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); - } + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnectionConfig(ConnectorType.TO, conf); + configJob = ConfigurationUtils.getConnectorJobConfig(ConnectorType.TO, conf); } // Create loader context http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index 3ce3a6a..e460c3e 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -36,7 +36,7 @@ import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.Job; //import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat; -import org.apache.sqoop.job.etl.HdfsExportExtractor; +//import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; import org.apache.sqoop.job.etl.Loader; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 5bce3a9..2359a06 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -40,8 +40,8 @@ import javax.sql.DataSource; import org.apache.log4j.Logger; import org.apache.commons.lang.StringUtils; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.model.MBooleanInput; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MConnectionForms; @@ -117,11 +117,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { registerForms(null, null, mf.getConnectionForms().getForms(), MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); - // Register all jobs - for (MJobForms jobForms : mf.getAllJobsForms().values()) { - registerForms(null, jobForms.getType(), jobForms.getForms(), - MFormType.JOB.name(), baseFormStmt, baseInputStmt); - } + // Register job forms + registerForms(null, null, mf.getJobForms().getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex); @@ -153,10 +151,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); // Register all jobs - for (MJobForms jobForms : mc.getAllJobsForms().values()) { - registerForms(connectorId, jobForms.getType(), jobForms.getForms(), - MFormType.JOB.name(), baseFormStmt, baseInputStmt); - } + registerForms(connectorId, ConnectorType.FROM, mc.getJobForms(ConnectorType.FROM).getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); + registerForms(connectorId, ConnectorType.TO, mc.getJobForms(ConnectorType.TO).getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); } catch (SQLException ex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0014, @@ -513,10 +511,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); // Register all jobs - for (MJobForms jobForms : mf.getAllJobsForms().values()) { - registerForms(null, jobForms.getType(), jobForms.getForms(), - MFormType.JOB.name(), baseFormStmt, baseInputStmt); - } + registerForms(null, null, mf.getJobForms().getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); // We're using hardcoded value for framework metadata as they are // represented as NULL in the database. @@ -544,8 +540,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { inputFetchStmt = conn.prepareStatement(STMT_FETCH_INPUT); List<MForm> connectionForms = new ArrayList<MForm>(); - Map<MJob.Type, List<MForm>> jobForms = - new HashMap<MJob.Type, List<MForm>>(); + List<MForm> jobForms = new ArrayList<MForm>(); loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1); @@ -555,7 +550,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } mf = new MFramework(new MConnectionForms(connectionForms), - convertToJobList(jobForms), detectFrameworkVersion(conn)); + new MJobForms(jobForms), detectFrameworkVersion(conn)); // We're using hardcoded value for framework metadata as they are // represented as NULL in the database. @@ -931,8 +926,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { stmt = conn.prepareStatement(STMT_INSERT_JOB, Statement.RETURN_GENERATED_KEYS); stmt.setString(1, job.getName()); - stmt.setLong(2, job.getConnectionId()); - stmt.setString(3, job.getType().name()); + stmt.setLong(2, job.getConnectionId(ConnectorType.FROM)); + stmt.setLong(3, job.getConnectionId(ConnectorType.TO)); stmt.setBoolean(4, job.getEnabled()); stmt.setString(5, job.getCreationUser()); stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime())); @@ -955,12 +950,16 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { createInputValues(STMT_INSERT_JOB_INPUT, jobId, - job.getConnectorPart().getForms(), + job.getConnectorPart(ConnectorType.FROM).getForms(), conn); createInputValues(STMT_INSERT_JOB_INPUT, jobId, job.getFrameworkPart().getForms(), conn); + createInputValues(STMT_INSERT_JOB_INPUT, + jobId, + job.getConnectorPart(ConnectorType.TO).getForms(), + conn); job.setPersistenceId(jobId); @@ -997,12 +996,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { // And reinsert new values createInputValues(STMT_INSERT_JOB_INPUT, job.getPersistenceId(), - job.getConnectorPart().getForms(), + job.getConnectorPart(ConnectorType.FROM).getForms(), conn); createInputValues(STMT_INSERT_JOB_INPUT, - job.getPersistenceId(), - job.getFrameworkPart().getForms(), - conn); + job.getPersistenceId(), + job.getFrameworkPart().getForms(), + conn); } catch (SQLException ex) { logException(ex, job); @@ -1620,14 +1619,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { formFetchStmt.setLong(1, connectorId); List<MForm> connectionForms = new ArrayList<MForm>(); - Map<MJob.Type, List<MForm>> jobForms = - new HashMap<MJob.Type, List<MForm>>(); + Map<ConnectorType, List<MForm>> jobForms = new HashMap<ConnectorType, List<MForm>>(); - loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1); + loadConnectorForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1); MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion, - new MConnectionForms(connectionForms), - convertToJobList(jobForms)); + new MConnectionForms(connectionForms), + new MJobForms(jobForms.get(ConnectorType.FROM)), + new MJobForms(jobForms.get(ConnectorType.TO))); mc.setPersistenceId(connectorId); connectors.add(mc); @@ -1674,13 +1673,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { List<MForm> connectorConnForms = new ArrayList<MForm>(); List<MForm> frameworkConnForms = new ArrayList<MForm>(); + List<MForm> frameworkJobForms = new ArrayList<MForm>(); + Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>(); - Map<MJob.Type, List<MForm>> connectorJobForms - = new HashMap<MJob.Type, List<MForm>>(); - Map<MJob.Type, List<MForm>> frameworkJobForms - = new HashMap<MJob.Type, List<MForm>>(); - - loadForms(connectorConnForms, connectorJobForms, + loadConnectorForms(connectorConnForms, connectorJobForms, formConnectorFetchStmt, inputFetchStmt, 2); loadForms(frameworkConnForms, frameworkJobForms, formFrameworkFetchStmt, inputFetchStmt, 2); @@ -1725,20 +1721,19 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { inputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT); while(rsJob.next()) { - long connectorId = rsJob.getLong(1); - long id = rsJob.getLong(2); - String name = rsJob.getString(3); - long connectionId = rsJob.getLong(4); - String stringType = rsJob.getString(5); - boolean enabled = rsJob.getBoolean(6); - String createBy = rsJob.getString(7); - Date creationDate = rsJob.getTimestamp(8); - String updateBy = rsJob.getString(9); - Date lastUpdateDate = rsJob.getTimestamp(10); - - MJob.Type type = MJob.Type.valueOf(stringType); - - formConnectorFetchStmt.setLong(1, connectorId); + long fromConnectorId = rsJob.getLong(1); + long toConnectorId = rsJob.getLong(2); + long id = rsJob.getLong(3); + String name = rsJob.getString(4); + long fromConnectionId = rsJob.getLong(5); + long toConnectionId = rsJob.getLong(6); + boolean enabled = rsJob.getBoolean(7); + String createBy = rsJob.getString(8); + Date creationDate = rsJob.getTimestamp(9); + String updateBy = rsJob.getString(10); + Date lastUpdateDate = rsJob.getTimestamp(11); + + formConnectorFetchStmt.setLong(1, fromConnectorId); inputFetchStmt.setLong(1, id); //inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms @@ -1746,20 +1741,20 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { List<MForm> connectorConnForms = new ArrayList<MForm>(); List<MForm> frameworkConnForms = new ArrayList<MForm>(); + List<MForm> frameworkJobForms = new ArrayList<MForm>(); + Map<ConnectorType, List<MForm>> connectorJobForms = new HashMap<ConnectorType, List<MForm>>(); - Map<MJob.Type, List<MForm>> connectorJobForms - = new HashMap<MJob.Type, List<MForm>>(); - Map<MJob.Type, List<MForm>> frameworkJobForms - = new HashMap<MJob.Type, List<MForm>>(); - - loadForms(connectorConnForms, connectorJobForms, - formConnectorFetchStmt, inputFetchStmt, 2); + loadConnectorForms(connectorConnForms, connectorJobForms, + formConnectorFetchStmt, inputFetchStmt, 2); loadForms(frameworkConnForms, frameworkJobForms, formFrameworkFetchStmt, inputFetchStmt, 2); - MJob job = new MJob(connectorId, connectionId, type, - new MJobForms(type, connectorJobForms.get(type)), - new MJobForms(type, frameworkJobForms.get(type))); + MJob job = new MJob( + fromConnectorId, toConnectorId, + fromConnectionId, toConnectionId, + new MJobForms(connectorJobForms.get(ConnectorType.FROM)), + new MJobForms(connectorJobForms.get(ConnectorType.TO)), + new MJobForms(frameworkJobForms)); job.setPersistenceId(id); job.setName(name); @@ -1773,8 +1768,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } finally { closeResultSets(rsJob); - closeStatements(formConnectorFetchStmt, - formFrameworkFetchStmt, inputFetchStmt); + closeStatements(formConnectorFetchStmt, formFrameworkFetchStmt, inputFetchStmt); } return jobs; @@ -1791,23 +1785,25 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * @param type * @param baseFormStmt * @param baseInputStmt + * @return short number of forms registered. * @throws SQLException */ - private void registerForms(Long connectorId, MJob.Type jobType, + private short registerForms(Long connectorId, ConnectorType connectorType, List<MForm> forms, String type, PreparedStatement baseFormStmt, PreparedStatement baseInputStmt) throws SQLException { short formIndex = 0; + for (MForm form : forms) { if(connectorId == null) { baseFormStmt.setNull(1, Types.BIGINT); } else { baseFormStmt.setLong(1, connectorId); } - if(jobType == null) { + if(connectorType == null) { baseFormStmt.setNull(2, Types.VARCHAR); } else { - baseFormStmt.setString(2, jobType.name()); + baseFormStmt.setString(2, connectorType.name()); } baseFormStmt.setString(3, form.getName()); baseFormStmt.setString(4, type); @@ -1830,6 +1826,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { List<MInput<?>> inputs = form.getInputs(); registerFormInputs(formId, inputs, baseInputStmt); } + return formIndex; } /** @@ -1921,7 +1918,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * @throws SQLException In case of any failure on Derby side */ public void loadForms(List<MForm> connectionForms, - Map<MJob.Type, List<MForm>> jobForms, + List<MForm> jobForms, PreparedStatement formFetchStmt, PreparedStatement inputFetchStmt, int formPosition) throws SQLException { @@ -2022,20 +2019,15 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { connectionForms.add(mf); break; case JOB: - MJob.Type jobType = MJob.Type.valueOf(operation); - if (!jobForms.containsKey(jobType)) { - jobForms.put(jobType, new ArrayList<MForm>()); - } - - if (jobForms.get(jobType).size() != formIndex) { + if (jobForms.size() != formIndex) { throw new SqoopException(DerbyRepoError.DERBYREPO_0010, "connector-" + formConnectorId + "; form: " + mf + "; index: " + formIndex - + "; expected: " + jobForms.get(jobType).size() + + "; expected: " + jobForms.size() ); } - jobForms.get(jobType).add(mf); + jobForms.add(mf); break; default: throw new SqoopException(DerbyRepoError.DERBYREPO_0007, @@ -2044,17 +2036,141 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } } - public List<MJobForms> convertToJobList(Map<MJob.Type, List<MForm>> l) { - List<MJobForms> ret = new ArrayList<MJobForms>(); + /** + * Load forms and corresponding inputs from Derby database. + * + * Use given prepared statements to load all forms and corresponding inputs + * from Derby. + * + * @param connectionForms List of connection forms that will be filled up + * @param jobForms Map with job forms that will be filled up + * @param formFetchStmt Prepared statement for fetching forms + * @param inputFetchStmt Prepare statement for fetching inputs + * @throws SQLException In case of any failure on Derby side + */ + public void loadConnectorForms(List<MForm> connectionForms, + Map<ConnectorType, List<MForm>> jobForms, + PreparedStatement formFetchStmt, + PreparedStatement inputFetchStmt, + int formPosition) throws SQLException { - for (Map.Entry<MJob.Type, List<MForm>> entry : l.entrySet()) { - MJob.Type type = entry.getKey(); - List<MForm> forms = entry.getValue(); + // Get list of structures from database + ResultSet rsetForm = formFetchStmt.executeQuery(); + while (rsetForm.next()) { + long formId = rsetForm.getLong(1); + Long formConnectorId = rsetForm.getLong(2); + String operation = rsetForm.getString(3); + String formName = rsetForm.getString(4); + String formType = rsetForm.getString(5); + int formIndex = rsetForm.getInt(6); + List<MInput<?>> formInputs = new ArrayList<MInput<?>>(); - ret.add(new MJobForms(type, forms)); - } + MForm mf = new MForm(formName, formInputs); + mf.setPersistenceId(formId); - return ret; + inputFetchStmt.setLong(formPosition, formId); + + ResultSet rsetInput = inputFetchStmt.executeQuery(); + while (rsetInput.next()) { + long inputId = rsetInput.getLong(1); + String inputName = rsetInput.getString(2); + long inputForm = rsetInput.getLong(3); + short inputIndex = rsetInput.getShort(4); + String inputType = rsetInput.getString(5); + boolean inputSensitivity = rsetInput.getBoolean(6); + short inputStrLength = rsetInput.getShort(7); + String inputEnumValues = rsetInput.getString(8); + String value = rsetInput.getString(9); + + MInputType mit = MInputType.valueOf(inputType); + + MInput input = null; + switch (mit) { + case STRING: + input = new MStringInput(inputName, inputSensitivity, inputStrLength); + break; + case MAP: + input = new MMapInput(inputName, inputSensitivity); + break; + case BOOLEAN: + input = new MBooleanInput(inputName, inputSensitivity); + break; + case INTEGER: + input = new MIntegerInput(inputName, inputSensitivity); + break; + case ENUM: + input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(",")); + break; + default: + throw new SqoopException(DerbyRepoError.DERBYREPO_0006, + "input-" + inputName + ":" + inputId + ":" + + "form-" + inputForm + ":" + mit.name()); + } + + // Set persistent ID + input.setPersistenceId(inputId); + + // Set value + if(value == null) { + input.setEmpty(); + } else { + input.restoreFromUrlSafeValueString(value); + } + + if (mf.getInputs().size() != inputIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0009, + "form: " + mf + + "; input: " + input + + "; index: " + inputIndex + + "; expected: " + mf.getInputs().size() + ); + } + + mf.getInputs().add(input); + } + + if (mf.getInputs().size() == 0) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0008, + "connector-" + formConnectorId + + "; form: " + mf + ); + } + + MFormType mft = MFormType.valueOf(formType); + switch (mft) { + case CONNECTION: + if (connectionForms.size() != formIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0010, + "connector-" + formConnectorId + + "; form: " + mf + + "; index: " + formIndex + + "; expected: " + connectionForms.size() + ); + } + connectionForms.add(mf); + break; + case JOB: + ConnectorType type = ConnectorType.valueOf(operation); + if (!jobForms.containsKey(type)) { + jobForms.put(type, new ArrayList<MForm>()); + } + + if (jobForms.get(type).size() != formIndex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0010, + "connector-" + formConnectorId + + "; form: " + mf + + "; index: " + formIndex + + "; expected: " + jobForms.get(type).size() + ); + } + + jobForms.get(type).add(mf); + break; + default: + throw new SqoopException(DerbyRepoError.DERBYREPO_0007, + "connector-" + formConnectorId + ":" + mf); + } + } } private void createInputValues(String query, http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java index fcbb475..1a77360 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaConstants.java @@ -144,9 +144,9 @@ public final class DerbySchemaConstants { public static final String COLUMN_SQB_NAME = "SQB_NAME"; - public static final String COLUMN_SQB_TYPE = "SQB_TYPE"; + public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION"; - public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION"; + public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION"; public static final String COLUMN_SQB_CREATION_USER = "SQB_CREATION_USER"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index 7042a53..e5bb2e7 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -286,13 +286,13 @@ public final class DerbySchemaQuery { public static final String QUERY_CREATE_TABLE_SQ_JOB = "CREATE TABLE " + TABLE_SQ_JOB + " (" + COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, " - + COLUMN_SQB_CONNECTION + " BIGINT, " + + COLUMN_SQB_FROM_CONNECTION + " BIGINT, " + + COLUMN_SQB_TO_CONNECTION + " BIGINT, " + COLUMN_SQB_NAME + " VARCHAR(64), " - + COLUMN_SQB_TYPE + " VARCHAR(64)," + COLUMN_SQB_CREATION_DATE + " TIMESTAMP," + COLUMN_SQB_UPDATE_DATE + " TIMESTAMP," + "CONSTRAINT " + CONSTRAINT_SQB_SQN + " " - + "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") " + + "FOREIGN KEY(" + COLUMN_SQB_FROM_CONNECTION + ") " + "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")" + ")"; @@ -702,8 +702,8 @@ public final class DerbySchemaQuery { public static final String STMT_INSERT_JOB = "INSERT INTO " + TABLE_SQ_JOB + " (" + COLUMN_SQB_NAME + ", " - + COLUMN_SQB_CONNECTION + ", " - + COLUMN_SQB_TYPE + ", " + + COLUMN_SQB_FROM_CONNECTION + ", " + + COLUMN_SQB_TO_CONNECTION + ", " + COLUMN_SQB_ENABLED + ", " + COLUMN_SQB_CREATION_USER + ", " + COLUMN_SQB_CREATION_DATE + ", " @@ -753,43 +753,49 @@ public final class DerbySchemaQuery { + " count(*)" + " FROM " + TABLE_SQ_JOB + " JOIN " + TABLE_SQ_CONNECTION - + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID + " WHERE " + COLUMN_SQN_ID + " = ? "; // DML: Select one specific job public static final String STMT_SELECT_JOB_SINGLE = "SELECT " - + COLUMN_SQN_CONNECTOR + ", " - + COLUMN_SQB_ID + ", " - + COLUMN_SQB_NAME + ", " - + COLUMN_SQB_CONNECTION + ", " - + COLUMN_SQB_TYPE + ", " - + COLUMN_SQB_ENABLED + ", " - + COLUMN_SQB_CREATION_USER + ", " - + COLUMN_SQB_CREATION_DATE + ", " - + COLUMN_SQB_UPDATE_USER + ", " - + COLUMN_SQB_UPDATE_DATE - + " FROM " + TABLE_SQ_JOB + + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", " + + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", " + + "job." + COLUMN_SQB_ID + ", " + + "job." + COLUMN_SQB_NAME + ", " + + "job." + COLUMN_SQB_FROM_CONNECTION + ", " + + "job." + COLUMN_SQB_TO_CONNECTION + ", " + + "job." + COLUMN_SQB_ENABLED + ", " + + "job." + COLUMN_SQB_CREATION_USER + ", " + + "job." + COLUMN_SQB_CREATION_DATE + ", " + + "job." + COLUMN_SQB_UPDATE_USER + ", " + + "job." + COLUMN_SQB_UPDATE_DATE + + " FROM " + TABLE_SQ_JOB + " AS job" + + " LEFT JOIN " + TABLE_SQ_CONNECTION + + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID + " LEFT JOIN " + TABLE_SQ_CONNECTION - + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID + " WHERE " + COLUMN_SQB_ID + " = ?"; // DML: Select all jobs public static final String STMT_SELECT_JOB_ALL = "SELECT " - + COLUMN_SQN_CONNECTOR + ", " - + COLUMN_SQB_ID + ", " - + COLUMN_SQB_NAME + ", " - + COLUMN_SQB_CONNECTION + ", " - + COLUMN_SQB_TYPE + ", " - + COLUMN_SQB_ENABLED + ", " - + COLUMN_SQB_CREATION_USER + ", " - + COLUMN_SQB_CREATION_DATE + ", " - + COLUMN_SQB_UPDATE_USER + ", " - + COLUMN_SQB_UPDATE_DATE - + " FROM " + TABLE_SQ_JOB + + "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", " + + "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", " + + "job." + COLUMN_SQB_ID + ", " + + "job." + COLUMN_SQB_NAME + ", " + + "job." + COLUMN_SQB_FROM_CONNECTION + ", " + + "job." + COLUMN_SQB_TO_CONNECTION + ", " + + "job." + COLUMN_SQB_ENABLED + ", " + + "job." + COLUMN_SQB_CREATION_USER + ", " + + "job." + COLUMN_SQB_CREATION_DATE + ", " + + "job." + COLUMN_SQB_UPDATE_USER + ", " + + "job." + COLUMN_SQB_UPDATE_DATE + + " FROM " + TABLE_SQ_JOB + " AS job" + + " LEFT JOIN " + TABLE_SQ_CONNECTION + + " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID + " LEFT JOIN " + TABLE_SQ_CONNECTION - + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID; + + " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID; // DML: Select all jobs for a Connector public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = @@ -797,8 +803,8 @@ public final class DerbySchemaQuery { + COLUMN_SQN_CONNECTOR + ", " + COLUMN_SQB_ID + ", " + COLUMN_SQB_NAME + ", " - + COLUMN_SQB_CONNECTION + ", " - + COLUMN_SQB_TYPE + ", " + + COLUMN_SQB_FROM_CONNECTION + ", " + + COLUMN_SQB_TO_CONNECTION + ", " + COLUMN_SQB_ENABLED + ", " + COLUMN_SQB_CREATION_USER + ", " + COLUMN_SQB_CREATION_DATE + ", " @@ -806,7 +812,7 @@ public final class DerbySchemaQuery { + COLUMN_SQB_UPDATE_DATE + " FROM " + TABLE_SQ_JOB + " LEFT JOIN " + TABLE_SQ_CONNECTION - + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID + + " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID + " AND " + COLUMN_SQN_CONNECTOR + " = ? "; // DML: Insert new submission http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java index c9c7648..2721846 100644 --- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java @@ -24,8 +24,8 @@ import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.json.ConnectionBean; +import org.apache.sqoop.json.ConnectionValidationBean; import org.apache.sqoop.json.JsonBean; -import org.apache.sqoop.json.ValidationBean; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MConnectionForms; @@ -204,8 +204,8 @@ public class ConnectionRequestHandler implements RequestHandler { frameworkValidation.getStatus()); // Return back validations in all cases - ValidationBean outputBean = - new ValidationBean(connectorValidation, frameworkValidation); + ConnectionValidationBean outputBean = + new ConnectionValidationBean(connectorValidation, frameworkValidation); // If we're good enough let's perform the action if(finalStatus.canProceed()) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java index 362ba79..473bb46 100644 --- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java +++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java @@ -19,13 +19,14 @@ package org.apache.sqoop.handler; import org.apache.log4j.Logger; import org.apache.sqoop.audit.AuditLoggerManager; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.json.JobBean; +import org.apache.sqoop.json.JobValidationBean; import org.apache.sqoop.json.JsonBean; -import org.apache.sqoop.json.ValidationBean; import org.apache.sqoop.model.FormUtils; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MJobForms; @@ -163,47 +164,59 @@ public class JobRequestHandler implements RequestHandler { MJob job = jobs.get(0); // Verify that user is not trying to spoof us - MJobForms connectorForms - = ConnectorManager.getInstance().getConnectorMetadata(job.getConnectorId()) - .getJobForms(job.getType()); + MJobForms fromConnectorForms = ConnectorManager.getInstance() + .getConnectorMetadata(job.getConnectorId(ConnectorType.FROM)) + .getJobForms(ConnectorType.FROM); + MJobForms toConnectorForms = ConnectorManager.getInstance() + .getConnectorMetadata(job.getConnectorId(ConnectorType.TO)) + .getJobForms(ConnectorType.TO); MJobForms frameworkForms = FrameworkManager.getInstance().getFramework() - .getJobForms(job.getType()); + .getJobForms(); - if(!connectorForms.equals(job.getConnectorPart()) - || !frameworkForms.equals(job.getFrameworkPart())) { + if(!fromConnectorForms.equals(job.getConnectorPart(ConnectorType.FROM)) + || !frameworkForms.equals(job.getFrameworkPart()) + || !toConnectorForms.equals(job.getConnectorPart(ConnectorType.TO))) { throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect form structure"); } // Responsible connector for this session - SqoopConnector connector = - ConnectorManager.getInstance().getConnector(job.getConnectorId()); + SqoopConnector fromConnector = + ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.FROM)); + SqoopConnector toConnector = + ConnectorManager.getInstance().getConnector(job.getConnectorId(ConnectorType.TO)); // Get validator objects - Validator connectorValidator = connector.getValidator(); + Validator fromConnectorValidator = fromConnector.getValidator(); Validator frameworkValidator = FrameworkManager.getInstance().getValidator(); + Validator toConnectorValidator = toConnector.getValidator(); // We need translate forms to configuration objects - Object connectorConfig = ClassUtils.instantiate( - connector.getJobConfigurationClass(job.getType())); + Object fromConnectorConfig = ClassUtils.instantiate( + fromConnector.getJobConfigurationClass(ConnectorType.FROM)); Object frameworkConfig = ClassUtils.instantiate( - FrameworkManager.getInstance().getJobConfigurationClass(job.getType())); + FrameworkManager.getInstance().getJobConfigurationClass()); + Object toConnectorConfig = ClassUtils.instantiate( + toConnector.getJobConfigurationClass(ConnectorType.TO)); - FormUtils.fromForms(job.getConnectorPart().getForms(), connectorConfig); + FormUtils.fromForms(job.getConnectorPart(ConnectorType.FROM).getForms(), fromConnectorConfig); FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkConfig); + FormUtils.fromForms(job.getConnectorPart(ConnectorType.TO).getForms(), toConnectorConfig); - // Validate both parts - Validation connectorValidation = - connectorValidator.validateJob(job.getType(), connectorConfig); + // Validate all parts + Validation fromConnectorValidation = + fromConnectorValidator.validateJob(fromConnectorConfig); Validation frameworkValidation = - frameworkValidator.validateJob(job.getType(), frameworkConfig); + frameworkValidator.validateJob(frameworkConfig); + Validation toConnectorValidation = + toConnectorValidator.validateJob(toConnectorConfig); - Status finalStatus = Status.getWorstStatus(connectorValidation.getStatus(), - frameworkValidation.getStatus()); + Status finalStatus = Status.getWorstStatus(fromConnectorValidation.getStatus(), + frameworkValidation.getStatus(), toConnectorValidation.getStatus()); // Return back validations in all cases - ValidationBean outputBean = - new ValidationBean(connectorValidation, frameworkValidation); + JobValidationBean outputBean = + new JobValidationBean(fromConnectorValidation, frameworkValidation, toConnectorValidation); // If we're good enough let's perform the action if(finalStatus.canProceed()) { @@ -247,8 +260,9 @@ public class JobRequestHandler implements RequestHandler { bean = new JobBean(jobs); // Add associated resources into the bean + // @TODO(Abe): From/To. for( MJob job : jobs) { - long connectorId = job.getConnectorId(); + long connectorId = job.getConnectorId(ConnectorType.FROM); if(!bean.hasConnectorBundle(connectorId)) { bean.addConnectorBundle(connectorId, ConnectorManager.getInstance().getResourceBundle(connectorId, locale)); @@ -258,7 +272,8 @@ public class JobRequestHandler implements RequestHandler { long jid = Long.valueOf(sjid); MJob job = repository.findJob(jid); - long connectorId = job.getConnectorId(); + // @TODO(Abe): From/To + long connectorId = job.getConnectorId(ConnectorType.FROM); bean = new JobBean(job); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java index f80552c..74c863d 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/CloneJobFunction.java @@ -20,6 +20,7 @@ package org.apache.sqoop.shell; import jline.ConsoleReader; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MPersistableEntity; import org.apache.sqoop.shell.core.Constants; @@ -70,8 +71,11 @@ public class CloneJobFunction extends SqoopFunction { MJob job = client.getJob(jobId); job.setPersistenceId(MPersistableEntity.PERSISTANCE_ID_DEFAULT); - ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId()); + ResourceBundle fromConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.FROM)); ResourceBundle frameworkBundle = client.getFrameworkResourceBundle(); + ResourceBundle toConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.TO)); Status status = Status.FINE; @@ -88,7 +92,7 @@ public class CloneJobFunction extends SqoopFunction { } // Fill in data from user - if(!fillJob(reader, job, connectorBundle, frameworkBundle)) { + if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) { return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java index 598adbc..de246cb 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/CreateJobFunction.java @@ -20,6 +20,7 @@ package org.apache.sqoop.shell; import jline.ConsoleReader; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MJob; import org.apache.sqoop.shell.core.Constants; import org.apache.sqoop.shell.utils.FormDisplayer; @@ -43,26 +44,26 @@ public class CreateJobFunction extends SqoopFunction { public CreateJobFunction() { this.addOption(OptionBuilder .withDescription(resourceString(Constants.RES_PROMPT_CONN_ID)) - .withLongOpt(Constants.OPT_XID) + .withLongOpt(Constants.OPT_FXID) .hasArg() - .create(Constants.OPT_XID_CHAR) + .create(Constants.OPT_FXID_CHAR) ); this.addOption(OptionBuilder - .withDescription(resourceString(Constants.RES_PROMPT_JOB_TYPE)) - .withLongOpt(Constants.OPT_TYPE) + .withDescription(resourceString(Constants.RES_PROMPT_CONN_ID)) + .withLongOpt(Constants.OPT_TXID) .hasArg() - .create(Constants.OPT_TYPE_CHAR) + .create(Constants.OPT_TXID_CHAR) ); } @Override public boolean validateArgs(CommandLine line) { - if (!line.hasOption(Constants.OPT_XID)) { - printlnResource(Constants.RES_ARGS_XID_MISSING); + if (!line.hasOption(Constants.OPT_FXID)) { + printlnResource(Constants.RES_ARGS_FXID_MISSING); return false; } - if (!line.hasOption(Constants.OPT_TYPE)) { - printlnResource(Constants.RES_ARGS_TYPE_MISSING); + if (!line.hasOption(Constants.OPT_TXID)) { + printlnResource(Constants.RES_ARGS_TXID_MISSING); return false; } return true; @@ -71,19 +72,23 @@ public class CreateJobFunction extends SqoopFunction { @Override @SuppressWarnings("unchecked") public Object executeFunction(CommandLine line, boolean isInteractive) throws IOException { - return createJob(getLong(line, Constants.OPT_XID), - line.getOptionValue(Constants.OPT_TYPE), + return createJob(getLong(line, Constants.OPT_FXID), + getLong(line, Constants.OPT_TXID), line.getArgList(), isInteractive); } - private Status createJob(Long connectionId, String type, List<String> args, boolean isInteractive) throws IOException { - printlnResource(Constants.RES_CREATE_CREATING_JOB, connectionId); + private Status createJob(Long fromConnectionId, Long toConnectionId, List<String> args, boolean isInteractive) throws IOException { + printlnResource(Constants.RES_CREATE_CREATING_JOB, fromConnectionId, toConnectionId); ConsoleReader reader = new ConsoleReader(); - MJob job = client.newJob(connectionId, MJob.Type.valueOf(type.toUpperCase())); + MJob job = client.newJob(fromConnectionId, toConnectionId); - ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId()); + // @TODO(Abe): From/To. + ResourceBundle fromConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.FROM)); + ResourceBundle toConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.TO)); ResourceBundle frameworkBundle = client.getFrameworkResourceBundle(); Status status = Status.FINE; @@ -98,7 +103,7 @@ public class CreateJobFunction extends SqoopFunction { } // Fill in data from user - if(!fillJob(reader, job, connectorBundle, frameworkBundle)) { + if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) { return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java index 54d8e9a..c345ada 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/DeleteConnectionFunction.java @@ -40,7 +40,7 @@ public class DeleteConnectionFunction extends SqoopFunction { @Override public boolean validateArgs(CommandLine line) { - if (!line.hasOption(Constants.OPT_XID)) { + if (!line.hasOption(Constants.OPT_FXID)) { printlnResource(Constants.RES_ARGS_XID_MISSING); return false; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java index 6e5c9b5..dfaa90e 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/ShowConnectionFunction.java @@ -42,9 +42,9 @@ public class ShowConnectionFunction extends SqoopFunction { .withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_ALL_CONNS)) .withLongOpt(Constants.OPT_ALL) .create(Constants.OPT_ALL_CHAR)); - this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_XID) + this.addOption(OptionBuilder.hasArg().withArgName(Constants.OPT_FXID) .withDescription(resourceString(Constants.RES_SHOW_PROMPT_DISPLAY_CONN_XID)) - .withLongOpt(Constants.OPT_XID) + .withLongOpt(Constants.OPT_FXID) .create(Constants.OPT_XID_CHAR)); } @@ -52,8 +52,8 @@ public class ShowConnectionFunction extends SqoopFunction { public Object executeFunction(CommandLine line, boolean isInteractive) { if (line.hasOption(Constants.OPT_ALL)) { showConnections(); - } else if (line.hasOption(Constants.OPT_XID)) { - showConnection(getLong(line, Constants.OPT_XID)); + } else if (line.hasOption(Constants.OPT_FXID)) { + showConnection(getLong(line, Constants.OPT_FXID)); } else { showSummary(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java index 9a5386c..4618211 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/ShowJobFunction.java @@ -19,6 +19,7 @@ package org.apache.sqoop.shell; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MJob; import org.apache.sqoop.shell.core.Constants; import org.apache.sqoop.shell.utils.TableDisplayer; @@ -67,25 +68,27 @@ public class ShowJobFunction extends SqoopFunction { List<String> header = new LinkedList<String>(); header.add(resourceString(Constants.RES_TABLE_HEADER_ID)); header.add(resourceString(Constants.RES_TABLE_HEADER_NAME)); - header.add(resourceString(Constants.RES_TABLE_HEADER_TYPE)); - header.add(resourceString(Constants.RES_TABLE_HEADER_CONNECTOR)); + header.add(resourceString(Constants.RES_TABLE_HEADER_FROM_CONNECTOR)); + header.add(resourceString(Constants.RES_TABLE_HEADER_TO_CONNECTOR)); header.add(resourceString(Constants.RES_TABLE_HEADER_ENABLED)); List<String> ids = new LinkedList<String>(); List<String> names = new LinkedList<String>(); - List<String> types = new LinkedList<String>(); - List<String> connectors = new LinkedList<String>(); + List<String> fromConnectors = new LinkedList<String>(); + List<String> toConnectors = new LinkedList<String>(); List<String> availabilities = new LinkedList<String>(); for(MJob job : jobs) { ids.add(String.valueOf(job.getPersistenceId())); names.add(job.getName()); - types.add(job.getType().toString()); - connectors.add(String.valueOf(job.getConnectorId())); + fromConnectors.add(String.valueOf( + job.getConnectorId(ConnectorType.FROM))); + toConnectors.add(String.valueOf( + job.getConnectorId(ConnectorType.TO))); availabilities.add(String.valueOf(job.getEnabled())); } - TableDisplayer.display(header, ids, names, types, connectors, availabilities); + TableDisplayer.display(header, ids, names, fromConnectors, toConnectors, availabilities); } private void showJobs() { @@ -118,13 +121,15 @@ public class ShowJobFunction extends SqoopFunction { formatter.format(job.getLastUpdateDate()) ); printlnResource(Constants.RES_SHOW_PROMPT_JOB_XID_CID_INFO, - job.getConnectionId(), - job.getConnectorId()); + job.getConnectionId(ConnectorType.FROM), + job.getConnectorId(ConnectorType.FROM)); // Display connector part - displayForms(job.getConnectorPart().getForms(), - client.getResourceBundle(job.getConnectorId())); + displayForms(job.getConnectorPart(ConnectorType.FROM).getForms(), + client.getResourceBundle(job.getConnectorId(ConnectorType.FROM))); displayForms(job.getFrameworkPart().getForms(), client.getFrameworkResourceBundle()); + displayForms(job.getConnectorPart(ConnectorType.TO).getForms(), + client.getResourceBundle(job.getConnectorId(ConnectorType.TO))); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba81ec7f/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java index b060bb4..fbaf661 100644 --- a/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java +++ b/shell/src/main/java/org/apache/sqoop/shell/UpdateJobFunction.java @@ -20,6 +20,7 @@ package org.apache.sqoop.shell; import jline.ConsoleReader; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.OptionBuilder; +import org.apache.sqoop.common.ConnectorType; import org.apache.sqoop.model.MJob; import org.apache.sqoop.shell.core.Constants; import org.apache.sqoop.shell.utils.FormDisplayer; @@ -70,8 +71,11 @@ public class UpdateJobFunction extends SqoopFunction { MJob job = client.getJob(jobId); - ResourceBundle connectorBundle = client.getResourceBundle(job.getConnectorId()); + ResourceBundle fromConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.FROM)); ResourceBundle frameworkBundle = client.getFrameworkResourceBundle(); + ResourceBundle toConnectorBundle = client.getResourceBundle( + job.getConnectorId(ConnectorType.TO)); Status status = Status.FINE; @@ -85,7 +89,7 @@ public class UpdateJobFunction extends SqoopFunction { } // Fill in data from user - if(!fillJob(reader, job, connectorBundle, frameworkBundle)) { + if(!fillJob(reader, job, fromConnectorBundle, frameworkBundle, toConnectorBundle)) { return status; }
