Repository: ambari Updated Branches: refs/heads/branch-2.5 5c59a8a9b -> 462a3e8b4
AMBARI-21291 Schema error during upgrade related to request table (dgrinenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/462a3e8b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/462a3e8b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/462a3e8b Branch: refs/heads/branch-2.5 Commit: 462a3e8b4c2a8026490a8bf056f51f10e866503d Parents: 5c59a8a Author: Dmytro Grinenko <hapyles...@apache.org> Authored: Fri Jun 23 08:13:08 2017 +0300 Committer: Dmytro Grinenko <hapyles...@apache.org> Committed: Fri Jun 23 08:13:08 2017 +0300 ---------------------------------------------------------------------- .../apache/ambari/server/orm/DBAccessor.java | 41 ++++++- .../ambari/server/orm/DBAccessorImpl.java | 113 ++++++++++++++++--- .../server/orm/helpers/dbms/DbmsHelper.java | 10 ++ .../orm/helpers/dbms/GenericDbmsHelper.java | 12 ++ .../server/orm/helpers/dbms/H2Helper.java | 10 ++ .../server/upgrade/UpgradeCatalog251.java | 3 +- .../ambari/server/orm/DBAccessorImplTest.java | 62 ++++++++++ 7 files changed, 232 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java index c637c05..899426e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java @@ -323,6 +323,40 @@ public interface DBAccessor { void executeQuery(String query, boolean ignoreFailure) throws SQLException; /** + * Execute prepared statements + * @param query query to execute + * @param arguments list of arguments for prepared statement + * @throws SQLException + */ + void executePreparedQuery(String query, Object...arguments) throws SQLException; + + /** + * Execute prepared update statements + * @param query query to execute + * @param ignoreFailure determines if exceptions during query execution should be ignored + * @param arguments list of arguments for prepared statement + * @throws SQLException + */ + void executePreparedQuery(String query, boolean ignoreFailure, Object...arguments) throws SQLException; + + /** + * Execute prepared update statements + * @param query query to execute + * @param arguments list of arguments for prepared statement + * @throws SQLException + */ + void executePreparedUpdate(String query, Object...arguments) throws SQLException; + + /** + * Execute prepared statements which will not ignore failures + * @param query + * @param ignoreFailure + * @param arguments + * @throws SQLException + */ + void executePreparedUpdate(String query, boolean ignoreFailure, Object...arguments) throws SQLException; + + /** * Drop table from schema * @param tableName * @throws SQLException @@ -639,13 +673,12 @@ public interface DBAccessor { * the target column name * @param targetIDFieldName * the target id key name matched with {@code sourceIDFieldName} - * @param isColumnNullable - * should be target column nullable or not - * + * @param initialValue + * initial value for null-contained cells * @throws SQLException */ void moveColumnToAnotherTable(String sourceTableName, DBColumnInfo sourceColumn, String sourceIDFieldName, - String targetTableName, DBColumnInfo targetColumn, String targetIDFieldName, boolean isColumnNullable) throws SQLException; + String targetTableName, DBColumnInfo targetColumn, String targetIDFieldName, Object initialValue) throws SQLException; enum DbType { ORACLE, http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java index 0e2237c..c11a1dd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.orm; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; @@ -26,6 +27,7 @@ import java.sql.Blob; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -614,7 +616,7 @@ public class DBAccessorImpl implements DBAccessor { case POSTGRES: case SQL_ANYWHERE: case SQL_SERVER: - default: { + default: { // ToDo: getAddColumnStatement not supporting default clause for binary fields String query = dbmsHelper.getAddColumnStatement(tableName, columnInfo); executeQuery(query); break; @@ -862,6 +864,86 @@ public class DBAccessorImpl implements DBAccessor { } } + /** + {@inheritDoc} + */ + public void executePreparedQuery(String query, Object...arguments) throws SQLException { + executePreparedQuery(query, false, arguments); + } + + /** + {@inheritDoc} + */ + public void executePreparedQuery(String query, boolean ignoreFailure, Object...arguments) throws SQLException{ + LOG.info("Executing prepared query: {}", query); + + PreparedStatement preparedStatement = getConnection().prepareStatement(query); + + for (int i = 0; i < arguments.length; i++) { + if (arguments[i] instanceof byte[]) { + byte[] binaryData = (byte[]) arguments[i]; + // JDBC drivers supports only this function signature + preparedStatement.setBinaryStream(i+1, new ByteArrayInputStream(binaryData), binaryData.length); + } else { + preparedStatement.setObject(i+1, arguments[i]); + } + } + try { + preparedStatement.execute(); + } catch (SQLException e) { + if (!ignoreFailure){ + LOG.error("Error executing prepared query: {}", query, e); + throw e; + } else { + LOG.warn("Error executing prepared query: {}, errorCode={}, message = {}", query, e.getErrorCode(), e.getMessage()); + } + } finally { + if (preparedStatement != null) { + preparedStatement.close(); + } + } + } + + /** + {@inheritDoc} + */ + public void executePreparedUpdate(String query, Object...arguments) throws SQLException { + executePreparedQuery(query, false, arguments); + } + + /** + {@inheritDoc} + */ + public void executePreparedUpdate(String query, boolean ignoreFailure, Object...arguments) throws SQLException{ + LOG.info("Executing prepared query: {}", query); + + PreparedStatement preparedStatement = getConnection().prepareStatement(query); + + for (int i = 0; i <= arguments.length; i++) { + if (arguments[i] instanceof byte[]) { + byte[] binaryData = (byte[]) arguments[i]; + // JDBC drivers supports only this function signature + preparedStatement.setBinaryStream(i+1, new ByteArrayInputStream(binaryData), binaryData.length); + } else { + preparedStatement.setObject(i+1, arguments[i]); + } + } + try { + preparedStatement.executeUpdate(); + } catch (SQLException e) { + if (!ignoreFailure){ + LOG.error("Error executing prepared query: {}", query, e); + throw e; + } else { + LOG.warn("Error executing prepared query: {}, errorCode={}, message = {}", query, e.getErrorCode(), e.getMessage()); + } + } finally { + if (preparedStatement != null) { + preparedStatement.close(); + } + } + } + @Override public void dropTable(String tableName) throws SQLException { String query = dbmsHelper.getDropTableStatement(tableName); @@ -1320,31 +1402,36 @@ public class DBAccessorImpl implements DBAccessor { * the target column name * @param targetIDFieldName * the target id key name matched with {@code sourceIDFieldName} - * @param isColumnNullable - * should be target column nullable or not - * + * @param initialValue + * initial value for null-contained cells * @throws SQLException */ @Override public void moveColumnToAnotherTable(String sourceTableName, DBColumnInfo sourceColumn, String sourceIDFieldName, - String targetTableName, DBColumnInfo targetColumn, String targetIDFieldName, boolean isColumnNullable) throws SQLException { + String targetTableName, DBColumnInfo targetColumn, String targetIDFieldName, Object initialValue) throws SQLException { if (this.tableHasColumn(sourceTableName, sourceIDFieldName)) { final String moveSQL = dbmsHelper.getCopyColumnToAnotherTableStatement(sourceTableName, sourceColumn.getName(), sourceIDFieldName, targetTableName, targetColumn.getName(),targetIDFieldName); + final boolean isTargetColumnNullable = targetColumn.isNullable(); + + targetColumn.setNullable(true); // setting column nullable by default to move rows with null - targetColumn.setNullable(true); // setting column nullable by default + addColumn(targetTableName, targetColumn); + executeUpdate(moveSQL, false); - this.addColumn(targetTableName, targetColumn); - this.executeUpdate(moveSQL, false); + if (initialValue != null) { + String updateSQL = dbmsHelper.getColumnUpdateStatementWhereColumnIsNull(convertObjectName(targetTableName), + convertObjectName(targetColumn.getName()), convertObjectName(targetColumn.getName())); + + executePreparedUpdate(updateSQL, initialValue); + } - if (!isColumnNullable) { - // this can will trigger exception if some record is null - // ToDo: add default option - this.setColumnNullable(targetTableName, targetColumn.getName(), false); + if (!isTargetColumnNullable) { + setColumnNullable(targetTableName, targetColumn.getName(), false); } - this.dropColumn(sourceTableName, sourceColumn.getName()); + dropColumn(sourceTableName, sourceColumn.getName()); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java index fa9814a..00b02f1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java @@ -62,6 +62,16 @@ public interface DbmsHelper { String... columnNames); /** + * Generating update SQL statement for {@link DBAccessor#executePreparedUpdate} + * + * @param tableName name of the table + * @param setColumnName column name, value of which need to be set + * @param conditionColumnName column name for the condition + * @return + */ + String getColumnUpdateStatementWhereColumnIsNull(String tableName, String setColumnName, String conditionColumnName); + + /** * Gets DROP INDEX statement * * @param indexName http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java index 2a0bb93..4b4ce8d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java @@ -270,6 +270,18 @@ public class GenericDbmsHelper implements DbmsHelper { return createIndex; } + /** + * Generating update SQL statement for {@link DBAccessor#executePreparedUpdate} + * + * @param tableName name of the table + * @param setColumnName column name, value of which need to be set + * @param conditionColumnName column name for the condition + * @return + */ + @Override + public String getColumnUpdateStatementWhereColumnIsNull(String tableName, String setColumnName, String conditionColumnName){ + return "UPDATE " + tableName + " SET " + setColumnName + "=? WHERE " + conditionColumnName + " IS NULL;"; + } /** * {@inheritDoc} http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java index e6cb8ab..100c865 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java @@ -71,4 +71,14 @@ public class H2Helper extends GenericDbmsHelper { .append(" WHERE C.TABLEID = T.TABLEID AND T.TABLENAME = '").append(tableName).append("'"); return statement.toString(); } + + /** + {@inheritDoc} + */ + @Override + public String getCopyColumnToAnotherTableStatement(String sourceTable, String sourceColumnName, + String sourceIDColumnName, String targetTable, String targetColumnName, String targetIDColumnName) { + return String.format("UPDATE %1$s a SET %3$s = (SELECT b.%4$s FROM %2$s b WHERE b.%6$s = a.%5$s LIMIT 1)", + targetTable, sourceTable, targetColumnName, sourceColumnName, targetIDColumnName, sourceIDColumnName); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java index f4f69f7..afda1f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java @@ -128,8 +128,7 @@ public class UpgradeCatalog251 extends AbstractUpgradeCatalog { DBColumnInfo sourceColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false); DBColumnInfo targetColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, byte[].class, null, null, false); - dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn, - REQUEST_ID_COLUMN, false); + dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn, REQUEST_ID_COLUMN, "{}".getBytes()); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/462a3e8b/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java index 9522259..ea43bb2 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java @@ -19,6 +19,8 @@ package org.apache.ambari.server.orm; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.matchers.JUnitMatchers.containsString; import java.io.ByteArrayInputStream; @@ -32,6 +34,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Vector; @@ -90,6 +93,18 @@ public class DBAccessorImplTest { dbAccessor.createTable(tableName, columns, "id"); } + private void createMyTable(String tableName, String...columnNames) throws Exception { + DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class); + + List<DBColumnInfo> columns = new ArrayList<>(); + columns.add(new DBColumnInfo("id", Long.class, null, null, false)); + for (String column: columnNames){ + columns.add(new DBColumnInfo(column, String.class, 20000, null, true)); + } + + dbAccessor.createTable(tableName, columns, "id"); + } + @Test public void testDbType() throws Exception { DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class); @@ -577,4 +592,51 @@ public class DBAccessorImplTest { assertEquals("'foo'", columnDefaultVal); } + + @Test + public void testMoveColumnToAnotherTable() throws Exception { + DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class); + String sourceTableName = getFreeTableName(); + String targetTableName = getFreeTableName(); + int testRowAmount = 10; + + createMyTable(sourceTableName, "col1", "col2"); + createMyTable(targetTableName, "col1"); + + for (Integer i=0; i < testRowAmount; i++){ + dbAccessor.insertRow(sourceTableName, + new String[] {"id", "col1", "col2"}, + new String[]{i.toString(), String.format("'source,1,%s'", i), String.format("'source,2,%s'", i)}, false); + + dbAccessor.insertRow(targetTableName, + new String[] {"id", "col1"}, + new String[]{i.toString(), String.format("'target,1,%s'", i)}, false); + } + + DBColumnInfo sourceColumn = new DBColumnInfo("col2", String.class, null, null, false); + DBColumnInfo targetColumn = new DBColumnInfo("col2", String.class, null, null, false); + + dbAccessor.moveColumnToAnotherTable(sourceTableName, sourceColumn, "id", + targetTableName, targetColumn, "id", "initial"); + + Statement statement = dbAccessor.getConnection().createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT col2 FROM " + targetTableName + " ORDER BY col2"); + + assertNotNull(resultSet); + + List<String> response = new LinkedList<>(); + + while (resultSet.next()){ + response.add(resultSet.getString(1)); + } + + assertEquals(testRowAmount, response.toArray().length); + + int i = 0; + for(String row: response){ + assertEquals(String.format("source,2,%s", i), row); + i++; + } + + } }