Repository: sqoop Updated Branches: refs/heads/sqoop2 410ee0462 -> 61dfb6db4
SQOOP-1516: Sqoop2: Config Input as a Top Level Entity - RepositoryAPI changes (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/61dfb6db Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/61dfb6db Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/61dfb6db Branch: refs/heads/sqoop2 Commit: 61dfb6db45d7f4ab7693b13d535626c6d4f2dd74 Parents: 410ee04 Author: Abraham Elmahrek <[email protected]> Authored: Wed Feb 25 13:56:56 2015 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Wed Feb 25 13:56:56 2015 -0800 ---------------------------------------------------------------------- .../sqoop/error/code/CommonRepositoryError.java | 16 ++ .../org/apache/sqoop/model/MConfigType.java | 26 ++- .../sqoop/model/MConfigUpdateEntityType.java | 34 ++++ .../main/java/org/apache/sqoop/model/MLink.java | 4 +- .../java/org/apache/sqoop/model/MMapInput.java | 2 +- .../apache/sqoop/repository/JdbcRepository.java | 117 +++++++++++++- .../sqoop/repository/JdbcRepositoryHandler.java | 62 +++++++- .../org/apache/sqoop/repository/Repository.java | 75 +++++++++ .../common/CommonRepositoryHandler.java | 137 ++++++++++++++-- ...RepositoryInsertUpdateDeleteSelectQuery.java | 23 +++ .../sqoop/repository/derby/DerbyTestCase.java | 6 +- .../sqoop/repository/derby/TestJobHandling.java | 159 +++++++++++++++++-- .../repository/derby/TestLinkHandling.java | 44 +++++ 13 files changed, 673 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java b/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java index 74e5de3..7db31dd 100644 --- a/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java +++ b/common/src/main/java/org/apache/sqoop/error/code/CommonRepositoryError.java @@ -196,6 +196,22 @@ public enum CommonRepositoryError implements ErrorCode { COMMON_0048("Config Input overrides could not be fetched"), + COMMON_0049("Unable to fetch FROM job config"), + + COMMON_0050("Unable to fetch TO job config"), + + COMMON_0051("Unable to fetch DRIVER job config"), + + COMMON_0052("Unable to fetch LINK config"), + + COMMON_0053("Unable to update job config"), + + COMMON_0054("Unable to update link config"), + + COMMON_0055("Unable to update CONNECTOR_ONLY editable config"), + + COMMON_0056("Unable to update USER_ONLY editable config"), + ; private final String message; http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MConfigType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigType.java b/common/src/main/java/org/apache/sqoop/model/MConfigType.java index fa29d5a..a8602fc 100644 --- a/common/src/main/java/org/apache/sqoop/model/MConfigType.java +++ b/common/src/main/java/org/apache/sqoop/model/MConfigType.java @@ -20,6 +20,11 @@ package org.apache.sqoop.model; import org.apache.sqoop.classification.InterfaceAudience; import org.apache.sqoop.classification.InterfaceStability; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + /** * Represents the various config types supported by the system. */ @@ -35,9 +40,26 @@ public enum MConfigType { CONNECTION, /** link config type */ - LINK, + LINK("link"), /** Job config type */ - JOB; + // NOTE: cannot use the constants declared below since it is not declared yet + // compiler restriction + JOB("from", "to", "driver"); + + private final Set<String> subTypes; + + MConfigType(String... subTypes) { + Set<String> subT = new HashSet<String>(); + subT.addAll(Arrays.asList(subTypes)); + this.subTypes = Collections.unmodifiableSet(subT); + } + + public static Set<String> getSubTypes(MConfigType type) { + return type.subTypes; + } + public static final String FROM_SUB_TYPE = "from"; + public static final String TO_SUB_TYPE = "to"; + public static final String DRIVER_SUB_TYPE = "driver"; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java b/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java new file mode 100644 index 0000000..6847804 --- /dev/null +++ b/common/src/main/java/org/apache/sqoop/model/MConfigUpdateEntityType.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.model; + +/** + * Represents the entities that can edit the config input values. + * + */ +public enum MConfigUpdateEntityType { + + /** update config values via rest API or command line */ + USER, + + /** update config values via connector upgrade tool */ + CONNECTOR, + + ; + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MLink.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MLink.java b/common/src/main/java/org/apache/sqoop/model/MLink.java index 062a4c5..01ff5df 100644 --- a/common/src/main/java/org/apache/sqoop/model/MLink.java +++ b/common/src/main/java/org/apache/sqoop/model/MLink.java @@ -81,8 +81,8 @@ public class MLink extends MAccountableEntity implements MClonable { public MLinkConfig getConnectorLinkConfig() { return connectorLinkConfig; } - public MConfig getConnectorLinkConfig(String formName) { - return connectorLinkConfig.getConfig(formName); + public MConfig getConnectorLinkConfig(String configName) { + return connectorLinkConfig.getConfig(configName); } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/common/src/main/java/org/apache/sqoop/model/MMapInput.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/sqoop/model/MMapInput.java b/common/src/main/java/org/apache/sqoop/model/MMapInput.java index ce0f0f7..973b5fa 100644 --- a/common/src/main/java/org/apache/sqoop/model/MMapInput.java +++ b/common/src/main/java/org/apache/sqoop/model/MMapInput.java @@ -48,7 +48,7 @@ public final class MMapInput extends MInput<Map<String, String>> { vsb.append("&"); } vsb.append(UrlSafeUtils.urlEncode(entry.getKey())).append("="); - vsb.append(UrlSafeUtils.urlEncode(entry.getValue())); + vsb.append(entry.getValue() != null ? UrlSafeUtils.urlEncode(entry.getValue()): null); } return vsb.toString(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index 091c3ca..9c5e15e 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -23,8 +23,11 @@ import java.util.List; import org.apache.log4j.Logger; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; +import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; @@ -657,6 +660,119 @@ public class JdbcRepository extends Repository { }); } + /** + * {@inheritDoc} + */ + @Override + public MConfig findFromJobConfig(final long jobId, final String configName) { + return (MConfig) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsJob(jobId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); + } + return handler.findFromJobConfig(jobId, configName, conn); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public MConfig findToJobConfig(final long jobId, final String configName) { + return (MConfig) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsJob(jobId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); + } + return handler.findToJobConfig(jobId, configName, conn); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public MConfig findDriverJobConfig(final long jobId, final String configName) { + return (MConfig) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsJob(jobId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); + } + return handler.findDriverJobConfig(jobId, configName, conn); + } + }); + } + + /** + * {@inheritDoc} + */ + @Override + public MConfig findLinkConfig(final long linkId, final String configName) { + return (MConfig) doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsLink(linkId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId); + } + return handler.findLinkConfig(linkId, configName, conn); + } + }); + } + + + /** + * {@inheritDoc} + */ + @Override + public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type) { + updateJobConfig(jobId, config, null); + } + /** + * {@inheritDoc} + */ + @Override + public void updateJobConfig(final long jobId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsJob(jobId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0020, "Invalid id: " + jobId); + } + handler.updateJobConfig(jobId, config, type, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + } + + /** + * {@inheritDoc} + */ + @Override + public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type) { + updateLinkConfig(linkId, config, type, null); + } + /** + * {@inheritDoc} + */ + @Override + public void updateLinkConfig(final long linkId, final MConfig config, final MConfigUpdateEntityType type, RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) { + if (!handler.existsLink(linkId, conn)) { + throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: " + linkId); + } + handler.updateLinkConfig(linkId, config, type, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + } + @Override protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) { doWithConnection(new DoWithConnection() { @@ -666,7 +782,6 @@ public class JdbcRepository extends Repository { return null; } }, (JdbcRepositoryTransaction) tx); - } @Override http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index f4b1374..f690887 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -21,6 +21,8 @@ import java.sql.Connection; import java.util.Date; import java.util.List; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MJob; @@ -416,4 +418,62 @@ public abstract class JdbcRepositoryHandler { * @return Most recent submission */ public abstract MSubmission findLastSubmissionForJob(long jobId, Connection conn); -} \ No newline at end of file + + /** + * fetch the job config for the FROM type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @param conn Connection to the repository + * @return config object + */ + public abstract MConfig findFromJobConfig(long jobId, String configName, Connection con); + + + /** + * fetch the job config for the TO type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @param conn Connection to the repository + * @return config object + */ + public abstract MConfig findToJobConfig(long jobId, String configName, Connection con); + + + /** + * fetch the job config for the DRIVER type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @param conn Connection to the repository + * @return config object + */ + public abstract MConfig findDriverJobConfig(long jobId, String configName, Connection con); + + + /** + * fetch the link config for the link type for the given name + * @param linkId id of the link + * @param configName name of the config unique to this link and type + * @param conn Connection to the repository + * @return config object + */ + public abstract MConfig findLinkConfig(long linkId, String configName, Connection con); + + /** + * Update the config object for the job + * @param jobId id of the job + * @param config name of the config + * @param type entity type updating the link config + * @param conn Connection to the repository + */ + public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type, Connection con); + + /** + * Update the config object for the link + * @param linkId id of the link + * @param config name of the config + * @param type entity type updating the link config + * @param conn Connection to the repository + */ + public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type, Connection con); + +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index e07676a..aa91661 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -33,6 +33,7 @@ import org.apache.sqoop.driver.DriverUpgrader; import org.apache.sqoop.json.DriverBean; import org.apache.sqoop.model.ConfigUtils; import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MConnector; import org.apache.sqoop.model.MDriver; import org.apache.sqoop.model.MDriverConfig; @@ -326,6 +327,80 @@ public abstract class Repository { */ public abstract MSubmission findLastSubmissionForJob(long jobId); + /** + * fetch the job config for the FROM type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @return config object + */ + public abstract MConfig findFromJobConfig(long jobId, String configName); + + + /** + * fetch the job config for the TO type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @return config object + */ + public abstract MConfig findToJobConfig(long jobId, String configName); + + + /** + * fetch the job config for the DRIVER type for the given name + * @param jobId id of the job + * @param configName name of the config unique to this job and type + * @return config object + */ + public abstract MConfig findDriverJobConfig(long jobId, String configName); + + + /** + * fetch the link config for the link type for the given name + * @param linkId id of the link + * @param configName name of the config unique to this link and type + * @return config object + */ + public abstract MConfig findLinkConfig(long linkId, String configName); + + + /** + * Update the config object for the job + * @param jobId id of the job + * @param config name of the config + * @param updateEntityType entity type updating the link config + */ + public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType); + + /** + * Update the config object for the job + * @param jobId id of the job + * @param config name of the config + * @param updateEntityType entity type updating the link config + * @param tx database transaction + */ + public abstract void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType updateEntityType, RepositoryTransaction tx); + + + /** + * Update the config object for the link + * @param linkId id of the link + * @param config name of the config + * @param updateEntityType entity type updating the link config + */ + public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType); + + /** + * Update the config object for the link + * @param linkId id of the link + * @param config name of the config + * @param updateEntityType entity type updating the link config + * @param tx database transaction + */ + public abstract void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType updateEntityType, RepositoryTransaction tx); + + + /*********************Configurable Upgrade APIs ******************************/ + /** * Update the connector with the new data supplied in the * <tt>newConnector</tt>. Also Update all configs associated with this http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java index 6a23fc2..87d2d9c 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java @@ -26,6 +26,7 @@ import org.apache.sqoop.common.SupportedDirections; import org.apache.sqoop.driver.Driver; import org.apache.sqoop.error.code.CommonRepositoryError; import org.apache.sqoop.model.InputEditable; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MLongInput; import org.apache.sqoop.model.SubmissionError; import org.apache.sqoop.model.MBooleanInput; @@ -1258,8 +1259,8 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { Statement.RETURN_GENERATED_KEYS); // Register link type config - registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(), - MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); + registerConfigs(connectorId, null /* No direction for LINK type config */, mc.getLinkConfig() + .getConfigs(), MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn); // Register both from/to job type config for connector if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) { @@ -1622,7 +1623,6 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { try { rsConnection = stmt.executeQuery(); - // connectorConfigFetchStatement = conn.prepareStatement(crudQueries.getStmtSelectConfigForConfigurable()); connectorConfigInputStatement = conn.prepareStatement(crudQueries.getStmtFetchLinkInput()); @@ -1719,7 +1719,6 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { List<MConfig> toConnectorFromJobConfig = new ArrayList<MConfig>(); List<MConfig> toConnectorToJobConfig = new ArrayList<MConfig>(); - // ?? dont we need 2 different driver configs for the from/to? List<MConfig> driverConfig = new ArrayList<MConfig>(); loadConnectorConfigs(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig, @@ -1947,6 +1946,125 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { return children; } + @Override + public MConfig findFromJobConfig(long jobId, String configName, Connection conn) { + MFromConfig fromConfigs = findJob(jobId, conn).getFromJobConfig(); + if (fromConfigs != null) { + MConfig config = fromConfigs.getConfig(configName); + if (config == null) { + throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName); + } + return config; + } + throw new SqoopException(CommonRepositoryError.COMMON_0049, "for configName :" + configName); + } + + @Override + public MConfig findToJobConfig(long jobId, String configName, Connection conn) { + MToConfig toConfigs = findJob(jobId, conn).getToJobConfig(); + if (toConfigs != null) { + MConfig config = toConfigs.getConfig(configName); + if (config == null) { + throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName); + } + return config; + } + throw new SqoopException(CommonRepositoryError.COMMON_0050, "for configName :" + configName); + } + + @Override + public MConfig findDriverJobConfig(long jobId, String configName, Connection conn) { + MDriverConfig driverConfigs = findJob(jobId, conn).getDriverConfig(); + if (driverConfigs != null) { + MConfig config = driverConfigs.getConfig(configName); + if (config == null) { + throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName); + } + return config; + } + throw new SqoopException(CommonRepositoryError.COMMON_0051, "for configName :" + configName); + } + + @Override + public MConfig findLinkConfig(long linkId, String configName, Connection conn) { + MConfig driverConfig = findLink(linkId, conn).getConnectorLinkConfig(configName); + if (driverConfig == null) { + throw new SqoopException(CommonRepositoryError.COMMON_0052, "for configName :" + configName); + } + return driverConfig; + } + + @SuppressWarnings("resource") + @Override + public void updateJobConfig(long jobId, MConfig config, MConfigUpdateEntityType type, + Connection conn) { + List<MInput<?>> inputs = config.getInputs(); + PreparedStatement updateStmt = null; + + try { + updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateJobInput()); + for (MInput<?> input : inputs) { + if (input.isEmpty()) { + continue; + } + validateEditableConstraints(type, input); + updateStmt.setString(1, input.getUrlSafeValueString()); + updateStmt.setLong(2, input.getPersistenceId()); + updateStmt.setLong(3, jobId); + updateStmt.executeUpdate(); + } + } catch (SQLException ex) { + logException(ex, jobId); + throw new SqoopException(CommonRepositoryError.COMMON_0053, ex); + } finally { + closeStatements(updateStmt); + } + } + + private void validateEditableConstraints(MConfigUpdateEntityType type, MInput<?> input) { + if (input.getEditable().equals(InputEditable.CONNECTOR_ONLY) + && type.equals(MConfigUpdateEntityType.USER)) { + throw new SqoopException(CommonRepositoryError.COMMON_0055); + } + if (input.getEditable().equals(InputEditable.USER_ONLY) + && type.equals(MConfigUpdateEntityType.CONNECTOR)) { + throw new SqoopException(CommonRepositoryError.COMMON_0056); + } + } + + @Override + public void updateLinkConfig(long linkId, MConfig config, MConfigUpdateEntityType type, + Connection conn) { + List<MInput<?>> inputs = config.getInputs(); + PreparedStatement updateStmt = null; + try { + updateStmt = conn.prepareStatement(crudQueries.getStmtUpdateLinkInput()); + for (MInput<?> input : inputs) { + if (input.isEmpty()) { + continue; + } + validateEditableConstraints(type, input); + updateStmt.setString(1, input.getUrlSafeValueString()); + updateStmt.setLong(2, input.getPersistenceId()); + updateStmt.setLong(3, linkId); + updateStmt.executeUpdate(); + } + } catch (SQLException ex) { + logException(ex, linkId); + throw new SqoopException(CommonRepositoryError.COMMON_0054, ex); + } finally { + closeStatements(updateStmt); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String validationQuery() { + return "values(1)"; // Yes, this is valid PostgreSQL SQL + } + /** * Load configs and corresponding inputs from Derby database. * @@ -2305,10 +2423,8 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { } } - private void createInputValues(String query, - long id, - List<MConfig> configs, - Connection conn) throws SQLException { + private void createInputValues(String query, long id, List<MConfig> configs, Connection conn) + throws SQLException { PreparedStatement stmt = null; int result; @@ -2316,7 +2432,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { stmt = conn.prepareStatement(query); for (MConfig config : configs) { - for (MInput input : config.getInputs()) { + for (MInput<?> input : config.getInputs()) { // Skip empty values as we're not interested in storing those in db if (input.isEmpty()) { continue; @@ -2327,8 +2443,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler { result = stmt.executeUpdate(); if (result != 1) { - throw new SqoopException(CommonRepositoryError.COMMON_0017, - Integer.toString(result)); + throw new SqoopException(CommonRepositoryError.COMMON_0017, Integer.toString(result)); } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java ---------------------------------------------------------------------- diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java index d61ff0b..3dbad01 100644 --- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java +++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java @@ -277,6 +277,14 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery { + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_ENABLED) + " = ? " + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_ID) + " = ?"; + + // UPDATE the LINK Input + public static final String STMT_UPDATE_LINK_INPUT = + "UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME) + " SET " + + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_VALUE) + " = ? " + + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_INPUT) + " = ?" + + " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNKI_LINK) + " = ?"; + // DML: Delete rows from link input table public static final String STMT_DELETE_LINK_INPUT = "DELETE FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_LINK_INPUT_NAME) @@ -384,6 +392,13 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery { + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ENABLED) + " = ? " + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQB_ID) + " = ?"; + // UPDATE the JOB Input + public static final String STMT_UPDATE_JOB_INPUT = + "UPDATE " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME) + " SET " + + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_VALUE) + " = ? " + + " WHERE " + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_INPUT) + " = ?" + + " AND " + CommonRepoUtils.escapeColumnName(COLUMN_SQBI_JOB) + " = ?"; + // DML: Delete rows from job input table public static final String STMT_DELETE_JOB_INPUT = "DELETE FROM " + CommonRepoUtils.getTableName(SCHEMA_SQOOP, TABLE_SQ_JOB_INPUT_NAME) @@ -695,6 +710,10 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery { return STMT_ENABLE_LINK; } + public String getStmtUpdateLinkInput() { + return STMT_UPDATE_LINK_INPUT; + } + public String getStmtDeleteLinkInput() { return STMT_DELETE_LINK_INPUT; } @@ -739,6 +758,10 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery { return STMT_ENABLE_JOB; } + public String getStmtUpdateJobInput() { + return STMT_UPDATE_JOB_INPUT; + } + public String getStmtDeleteJobInput() { return STMT_DELETE_JOB_INPUT; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java index bea5cd7..9ed7627 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/DerbyTestCase.java @@ -526,7 +526,7 @@ abstract public class DerbyTestCase { // First config runInsertQuery("INSERT INTO SQOOP.SQ_INPUT" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)" - + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30, 'CONNECTOR_ONLY')"); + + " VALUES('I1', " + (i * 2 + 1) + ", 0, 'STRING', false, 30, 'USER_ONLY')"); runInsertQuery("INSERT INTO SQOOP.SQ_INPUT" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)" + " VALUES('I2', " + (i * 2 + 1) + ", 1, 'MAP', false, 30, 'CONNECTOR_ONLY')"); @@ -534,10 +534,10 @@ abstract public class DerbyTestCase { // Second config runInsertQuery("INSERT INTO SQOOP.SQ_INPUT" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)" - + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30, 'CONNECTOR_ONLY')"); + + " VALUES('I3', " + (i * 2 + 2) + ", 0, 'STRING', false, 30, 'USER_ONLY')"); runInsertQuery("INSERT INTO SQOOP.SQ_INPUT" + "(SQI_NAME, SQI_CONFIG, SQI_INDEX, SQI_TYPE, SQI_STRMASK, SQI_STRLENGTH, SQI_EDITABLE)" - + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30, 'CONNECTOR_ONLY')"); + + " VALUES('I4', " + (i * 2 + 2) + ", 1, 'MAP', false, 30, 'USER_ONLY')"); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java index 6a248e9..b889b85 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java @@ -32,14 +32,18 @@ import java.util.Map; import org.apache.sqoop.common.Direction; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MDriver; +import org.apache.sqoop.model.MFromConfig; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MMapInput; import org.apache.sqoop.model.MStringInput; +import org.apache.sqoop.model.MToConfig; import org.apache.sqoop.error.code.CommonRepositoryError; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + /** * Test job methods on Derby repository. */ @@ -244,11 +248,19 @@ public class TestJobHandling extends DerbyTestCase { configs = job.getJobConfig(Direction.FROM).getConfigs(); ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated"); - ((MMapInput)configs.get(0).getInputs().get(1)).setValue(null); + Map<String, String> newFromMap = new HashMap<String, String>(); + newFromMap.put("1F", "foo"); + newFromMap.put("2F", "bar"); + + ((MMapInput)configs.get(0).getInputs().get(1)).setValue(newFromMap); configs = job.getJobConfig(Direction.TO).getConfigs(); ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated"); - ((MMapInput)configs.get(0).getInputs().get(1)).setValue(null); + Map<String, String> newToMap = new HashMap<String, String>(); + newToMap.put("1T", "foo"); + newToMap.put("2T", "bar"); + + ((MMapInput)configs.get(0).getInputs().get(1)).setValue(newToMap); configs = job.getDriverConfig().getConfigs(); ((MStringInput)configs.get(0).getInputs().get(0)).setValue("Updated"); @@ -262,7 +274,7 @@ public class TestJobHandling extends DerbyTestCase { assertEquals(1, job.getPersistenceId()); assertCountForTable("SQOOP.SQ_JOB", 4); - assertCountForTable("SQOOP.SQ_JOB_INPUT", 26); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 28); MJob retrieved = handler.findJob(1, derbyConnection); assertEquals("name", retrieved.getName()); @@ -270,17 +282,17 @@ public class TestJobHandling extends DerbyTestCase { configs = job.getJobConfig(Direction.FROM).getConfigs(); assertEquals(2, configs.size()); assertEquals("Updated", configs.get(0).getInputs().get(0).getValue()); - assertNull(configs.get(0).getInputs().get(1).getValue()); + assertEquals(newFromMap, configs.get(0).getInputs().get(1).getValue()); configs = job.getJobConfig(Direction.TO).getConfigs(); assertEquals(2, configs.size()); assertEquals("Updated", configs.get(0).getInputs().get(0).getValue()); - assertNull(configs.get(0).getInputs().get(1).getValue()); + assertEquals(newToMap, configs.get(0).getInputs().get(1).getValue()); configs = retrieved.getDriverConfig().getConfigs(); assertEquals(2, configs.size()); assertEquals("Updated", configs.get(0).getInputs().get(0).getValue()); assertNotNull(configs.get(0).getInputs().get(1).getValue()); - assertEquals(((Map)configs.get(0).getInputs().get(1).getValue()).size(), 0); + assertEquals(((Map) configs.get(0).getInputs().get(1).getValue()).size(), 0); } @Test @@ -323,11 +335,136 @@ public class TestJobHandling extends DerbyTestCase { assertCountForTable("SQOOP.SQ_JOB_INPUT", 0); } + @Test + public void testUpdateJobConfig() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + MJob job = handler.findJob(1, derbyConnection); + + List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs(); + MConfig fromConfig = fromConfigs.get(0).clone(false); + MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs()); + + ((MStringInput) newFromConfig.getInputs().get(0)).setValue("FromJobConfigUpdated"); + + handler.updateJobConfig(job.getPersistenceId(), newFromConfig, MConfigUpdateEntityType.USER, + derbyConnection); + + MJob updatedJob = handler.findJob(1, derbyConnection); + MFromConfig newFromConfigs = updatedJob.getFromJobConfig(); + assertEquals(2, newFromConfigs.getConfigs().size()); + MConfig updatedFromConfig = newFromConfigs.getConfigs().get(0); + assertEquals("FromJobConfigUpdated", updatedFromConfig.getInputs().get(0).getValue()); + + List<MConfig> toConfigs = job.getJobConfig(Direction.TO).getConfigs(); + MConfig toConfig = toConfigs.get(0).clone(false); + MConfig newToConfig = new MConfig(toConfig.getName(), toConfig.getInputs()); + + ((MStringInput) newToConfig.getInputs().get(0)).setValue("ToJobConfigUpdated"); + + handler.updateJobConfig(job.getPersistenceId(), newToConfig, MConfigUpdateEntityType.USER, + derbyConnection); + + updatedJob = handler.findJob(1, derbyConnection); + MToConfig newToConfigs = updatedJob.getToJobConfig(); + assertEquals(2, newToConfigs.getConfigs().size()); + MConfig updatedToConfig = newToConfigs.getConfigs().get(0); + assertEquals("ToJobConfigUpdated", updatedToConfig.getInputs().get(0).getValue()); + } + + @Test(expectedExceptions = SqoopException.class) + public void testIncorrectEntityCausingConfigUpdate() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + MJob job = handler.findJob(1, derbyConnection); + + List<MConfig> fromConfigs = job.getJobConfig(Direction.FROM).getConfigs(); + MConfig fromConfig = fromConfigs.get(0).clone(false); + MConfig newFromConfig = new MConfig(fromConfig.getName(), fromConfig.getInputs()); + HashMap<String, String> newMap = new HashMap<String, String>(); + newMap.put("1", "foo"); + newMap.put("2", "bar"); + + ((MMapInput) newFromConfig.getInputs().get(1)).setValue(newMap); + + handler.updateJobConfig(job.getPersistenceId(), newFromConfig, MConfigUpdateEntityType.USER, + derbyConnection); + } + + @Test + public void testFindAndUpdateJobConfig() throws Exception { + loadJobsForLatestVersion(); + MJob job = handler.findJob(1, derbyConnection); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + MConfig fromConfig = handler.findFromJobConfig(1, "C1JOB1", derbyConnection); + assertEquals("Value5", fromConfig.getInputs().get(0).getValue()); + assertNull(fromConfig.getInputs().get(1).getValue()); + + MConfig toConfig = handler.findToJobConfig(1, "C2JOB2", derbyConnection); + assertEquals("Value11", toConfig.getInputs().get(0).getValue()); + assertNull(toConfig.getInputs().get(1).getValue()); + HashMap<String, String> newMap = new HashMap<String, String>(); + newMap.put("1UPDATED", "foo"); + newMap.put("2UPDATED", "bar"); + ((MStringInput) toConfig.getInputs().get(0)).setValue("test"); + ((MMapInput) toConfig.getInputs().get(1)).setValue(newMap); + + handler.updateJobConfig(job.getPersistenceId(), toConfig, MConfigUpdateEntityType.USER, + derbyConnection); + assertEquals("test", toConfig.getInputs().get(0).getValue()); + assertEquals(newMap, toConfig.getInputs().get(1).getValue()); + + MConfig driverConfig = handler.findDriverJobConfig(1, "d1", derbyConnection); + assertEquals("Value13", driverConfig.getInputs().get(0).getValue()); + assertNull(driverConfig.getInputs().get(1).getValue()); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNonExistingFromConfigFetch() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + handler.findFromJobConfig(1, "Non-ExistingC1JOB1", derbyConnection); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNonExistingToConfigFetch() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + handler.findToJobConfig(1, "Non-ExistingC2JOB1", derbyConnection); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNonExistingDriverConfigFetch() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + handler.findDriverJobConfig(1, "Non-Existingd1", derbyConnection); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNonExistingJobConfig() throws Exception { + loadJobsForLatestVersion(); + + assertCountForTable("SQOOP.SQ_JOB", 4); + assertCountForTable("SQOOP.SQ_JOB_INPUT", 24); + // 11 does not exist + handler.findDriverJobConfig(11, "Non-d1", derbyConnection); + } + public MJob getJob() { - return new MJob(1, 1, 1, 1, - handler.findConnector("A", derbyConnection).getFromConfig(), - handler.findConnector("A", derbyConnection).getToConfig(), - handler.findDriver(MDriver.DRIVER_NAME, derbyConnection).getDriverConfig() - ); + return new MJob(1, 1, 1, 1, handler.findConnector("A", derbyConnection).getFromConfig(), + handler.findConnector("A", derbyConnection).getToConfig(), handler.findDriver( + MDriver.DRIVER_NAME, derbyConnection).getDriverConfig()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/61dfb6db/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java index 523464b..1ee7996 100644 --- a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java +++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestLinkHandling.java @@ -28,7 +28,9 @@ import java.util.List; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigUpdateEntityType; import org.apache.sqoop.model.MLink; +import org.apache.sqoop.model.MLinkConfig; import org.apache.sqoop.model.MMapInput; import org.apache.sqoop.model.MStringInput; import org.apache.sqoop.error.code.CommonRepositoryError; @@ -287,6 +289,48 @@ public class TestLinkHandling extends DerbyTestCase { assertCountForTable("SQOOP.SQ_LINK_INPUT", 0); } + @Test + public void testUpdateLinkConfig() throws Exception { + loadLinksForLatestVersion(); + + assertCountForTable("SQOOP.SQ_LINK", 2); + assertCountForTable("SQOOP.SQ_LINK_INPUT", 8); + MLink link = handler.findLink(1, getDerbyDatabaseConnection()); + + List<MConfig> configs = link.getConnectorLinkConfig().getConfigs(); + MConfig config = configs.get(0).clone(false); + MConfig newConfig = new MConfig(config.getName(), config.getInputs()); + + ((MStringInput) newConfig.getInputs().get(0)).setValue("LinkConfigUpdated"); + + handler.updateLinkConfig(link.getPersistenceId(), newConfig, MConfigUpdateEntityType.USER, + getDerbyDatabaseConnection()); + + MLink updatedLink = handler.findLink(1, getDerbyDatabaseConnection()); + MLinkConfig newConfigs = updatedLink.getConnectorLinkConfig(); + assertEquals(2, newConfigs.getConfigs().size()); + MConfig updatedLinkConfig = newConfigs.getConfigs().get(0); + assertEquals("LinkConfigUpdated", updatedLinkConfig.getInputs().get(0).getValue()); + } + + @Test(expectedExceptions = SqoopException.class) + public void testNonExistingLinkConfigFetch() throws Exception { + loadLinksForLatestVersion(); + assertCountForTable("SQOOP.SQ_LINK", 2); + assertCountForTable("SQOOP.SQ_LINK_INPUT", 8); + handler.findLinkConfig(1, "Non-ExistingC1LINK1", getDerbyDatabaseConnection()); + } + + @Test + public void testLinkConfigFetch() throws Exception { + loadLinksForLatestVersion(); + assertCountForTable("SQOOP.SQ_LINK", 2); + assertCountForTable("SQOOP.SQ_LINK_INPUT", 8); + MConfig config = handler.findLinkConfig(1, "C1LINK0", getDerbyDatabaseConnection()); + assertEquals("Value1", config.getInputs().get(0).getValue()); + assertNull(config.getInputs().get(1).getValue()); + } + public MLink getLink() { return new MLink(1, handler.findConnector("A", getDerbyDatabaseConnection()).getLinkConfig()); }
