Repository: ambari
Updated Branches:
  refs/heads/trunk 6fe7cc283 -> 8f06a5b01


AMBARI-21291 Schema error during upgrade related to request table (dgrinenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8f06a5b0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8f06a5b0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8f06a5b0

Branch: refs/heads/trunk
Commit: 8f06a5b01719b825e8bcee2a2a56545a1c3052a9
Parents: 6fe7cc2
Author: Dmytro Grinenko <hapyles...@apache.org>
Authored: Fri Jun 23 08:13:54 2017 +0300
Committer: Dmytro Grinenko <hapyles...@apache.org>
Committed: Fri Jun 23 08:13:54 2017 +0300

----------------------------------------------------------------------
 .../apache/ambari/server/orm/DBAccessor.java    |  41 ++++++-
 .../ambari/server/orm/DBAccessorImpl.java       | 113 ++++++++++++++++---
 .../server/orm/helpers/dbms/DbmsHelper.java     |  10 ++
 .../orm/helpers/dbms/GenericDbmsHelper.java     |  12 ++
 .../server/orm/helpers/dbms/H2Helper.java       |  10 ++
 .../server/upgrade/UpgradeCatalog251.java       |   3 +-
 .../ambari/server/orm/DBAccessorImplTest.java   |  62 ++++++++++
 7 files changed, 232 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
index 4f29d61..4ddaf26 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java
@@ -323,6 +323,40 @@ public interface DBAccessor {
   void executeQuery(String query, boolean ignoreFailure) throws SQLException;
 
   /**
+   * Execute prepared statements
+   * @param query query to execute
+   * @param arguments list of arguments for prepared statement
+   * @throws SQLException
+   */
+  void executePreparedQuery(String query, Object...arguments) throws 
SQLException;
+
+  /**
+   * Execute prepared update statements
+   * @param query query to execute
+   * @param ignoreFailure determines if exceptions during query execution 
should be ignored
+   * @param arguments list of arguments for prepared statement
+   * @throws SQLException
+   */
+  void executePreparedQuery(String query, boolean ignoreFailure, 
Object...arguments) throws SQLException;
+
+  /**
+   * Execute prepared update statements
+   * @param query query to execute
+   * @param arguments list of arguments for prepared statement
+   * @throws SQLException
+   */
+  void executePreparedUpdate(String query, Object...arguments) throws 
SQLException;
+
+  /**
+   * Execute prepared statements which will not ignore failures
+   * @param query
+   * @param ignoreFailure
+   * @param arguments
+   * @throws SQLException
+   */
+  void executePreparedUpdate(String query, boolean ignoreFailure, 
Object...arguments) throws SQLException;
+
+  /**
    * Drop table from schema
    * @param tableName
    * @throws SQLException
@@ -638,13 +672,12 @@ public interface DBAccessor {
    *          the target column name
    * @param targetIDFieldName
    *          the target id key name matched with {@code sourceIDFieldName}
-   * @param isColumnNullable
-   *          should be target column nullable or not
-   *
+   * @param initialValue
+   *          initial value for null-contained cells
    * @throws SQLException
    */
   void moveColumnToAnotherTable(String sourceTableName, DBColumnInfo 
sourceColumn, String sourceIDFieldName,
-       String targetTableName, DBColumnInfo targetColumn, String 
targetIDFieldName, boolean isColumnNullable) throws SQLException;
+       String targetTableName, DBColumnInfo targetColumn, String 
targetIDFieldName, Object initialValue) throws SQLException;
 
   enum DbType {
     ORACLE,

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
index 9077362..13e7d7d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.orm;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -26,6 +27,7 @@ import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -614,7 +616,7 @@ public class DBAccessorImpl implements DBAccessor {
       case POSTGRES:
       case SQL_ANYWHERE:
       case SQL_SERVER:
-      default: {
+      default: {  // ToDo: getAddColumnStatement not supporting default clause 
for binary fields
         String query = dbmsHelper.getAddColumnStatement(tableName, columnInfo);
         executeQuery(query);
         break;
@@ -862,6 +864,86 @@ public class DBAccessorImpl implements DBAccessor {
     }
   }
 
+  /**
+   {@inheritDoc}
+   */
+  public void executePreparedQuery(String query, Object...arguments) throws 
SQLException {
+    executePreparedQuery(query, false, arguments);
+  }
+
+  /**
+   {@inheritDoc}
+   */
+  public void executePreparedQuery(String query, boolean ignoreFailure, 
Object...arguments) throws SQLException{
+    LOG.info("Executing prepared query: {}", query);
+
+    PreparedStatement preparedStatement = 
getConnection().prepareStatement(query);
+
+      for (int i = 0; i < arguments.length; i++) {
+        if (arguments[i] instanceof byte[]) {
+          byte[] binaryData = (byte[]) arguments[i];
+          // JDBC drivers supports only this function signature
+          preparedStatement.setBinaryStream(i+1, new 
ByteArrayInputStream(binaryData), binaryData.length);
+        } else {
+          preparedStatement.setObject(i+1, arguments[i]);
+        }
+      }
+    try {
+        preparedStatement.execute();
+    } catch (SQLException e) {
+        if (!ignoreFailure){
+          LOG.error("Error executing prepared query: {}", query, e);
+          throw e;
+        } else {
+          LOG.warn("Error executing prepared query: {}, errorCode={}, message 
= {}", query, e.getErrorCode(), e.getMessage());
+        }
+    } finally {
+        if (preparedStatement != null) {
+          preparedStatement.close();
+        }
+    }
+  }
+
+  /**
+   {@inheritDoc}
+   */
+  public void executePreparedUpdate(String query, Object...arguments) throws 
SQLException {
+    executePreparedQuery(query, false, arguments);
+  }
+
+  /**
+   {@inheritDoc}
+   */
+  public void executePreparedUpdate(String query, boolean ignoreFailure, 
Object...arguments) throws SQLException{
+    LOG.info("Executing prepared query: {}", query);
+
+    PreparedStatement preparedStatement = 
getConnection().prepareStatement(query);
+
+    for (int i = 0; i <= arguments.length; i++) {
+      if (arguments[i] instanceof byte[]) {
+        byte[] binaryData = (byte[]) arguments[i];
+        // JDBC drivers supports only this function signature
+        preparedStatement.setBinaryStream(i+1, new 
ByteArrayInputStream(binaryData), binaryData.length);
+      } else {
+        preparedStatement.setObject(i+1, arguments[i]);
+      }
+    }
+    try {
+      preparedStatement.executeUpdate();
+    } catch (SQLException e) {
+      if (!ignoreFailure){
+        LOG.error("Error executing prepared query: {}", query, e);
+        throw e;
+      } else {
+        LOG.warn("Error executing prepared query: {}, errorCode={}, message = 
{}", query, e.getErrorCode(), e.getMessage());
+      }
+    } finally {
+      if (preparedStatement != null) {
+        preparedStatement.close();
+      }
+    }
+  }
+
   @Override
   public void dropTable(String tableName) throws SQLException {
     String query = dbmsHelper.getDropTableStatement(tableName);
@@ -1321,31 +1403,36 @@ public class DBAccessorImpl implements DBAccessor {
    *          the target column name
    * @param targetIDFieldName
    *          the target id key name matched with {@code sourceIDFieldName}
-   * @param isColumnNullable
-   *          should be target column nullable or not
-   *
+   * @param initialValue
+   *          initial value for null-contained cells
    * @throws SQLException
    */
   @Override
   public void moveColumnToAnotherTable(String sourceTableName, DBColumnInfo 
sourceColumn, String sourceIDFieldName,
-              String targetTableName, DBColumnInfo targetColumn, String 
targetIDFieldName,  boolean isColumnNullable) throws SQLException {
+              String targetTableName, DBColumnInfo targetColumn, String 
targetIDFieldName, Object initialValue) throws SQLException {
 
     if (this.tableHasColumn(sourceTableName, sourceIDFieldName)) {
 
       final String moveSQL = 
dbmsHelper.getCopyColumnToAnotherTableStatement(sourceTableName, 
sourceColumn.getName(),
         sourceIDFieldName, targetTableName, 
targetColumn.getName(),targetIDFieldName);
+      final boolean isTargetColumnNullable = targetColumn.isNullable();
+
+      targetColumn.setNullable(true);  // setting column nullable by default 
to move rows with null
 
-      targetColumn.setNullable(true);  // setting column nullable by default
+      addColumn(targetTableName, targetColumn);
+      executeUpdate(moveSQL, false);
 
-      this.addColumn(targetTableName, targetColumn);
-      this.executeUpdate(moveSQL, false);
+      if (initialValue != null) {
+        String updateSQL = 
dbmsHelper.getColumnUpdateStatementWhereColumnIsNull(convertObjectName(targetTableName),
+          convertObjectName(targetColumn.getName()), 
convertObjectName(targetColumn.getName()));
+
+        executePreparedUpdate(updateSQL, initialValue);
+      }
 
-      if (!isColumnNullable) {
-        // this can will trigger exception if some record is null
-        // ToDo: add default option
-        this.setColumnNullable(targetTableName, targetColumn.getName(), false);
+      if (!isTargetColumnNullable) {
+        setColumnNullable(targetTableName, targetColumn.getName(), false);
       }
-      this.dropColumn(sourceTableName, sourceColumn.getName());
+      dropColumn(sourceTableName, sourceColumn.getName());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java
index 7f74bb0..b30d01b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/DbmsHelper.java
@@ -62,6 +62,16 @@ public interface DbmsHelper {
                                  String... columnNames);
 
   /**
+   * Generating update SQL statement for {@link 
DBAccessor#executePreparedUpdate}
+   *
+   * @param tableName name of the table
+   * @param setColumnName column name, value of which need to be set
+   * @param conditionColumnName column name for the condition
+   * @return
+   */
+  String getColumnUpdateStatementWhereColumnIsNull(String tableName, String 
setColumnName, String conditionColumnName);
+
+  /**
    * Gets DROP INDEX statement
    *
    * @param indexName

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java
index afa4ac7..56274c5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/GenericDbmsHelper.java
@@ -270,6 +270,18 @@ public class GenericDbmsHelper implements DbmsHelper {
     return createIndex;
   }
 
+  /**
+   * Generating update SQL statement for {@link 
DBAccessor#executePreparedUpdate}
+   *
+   * @param tableName name of the table
+   * @param setColumnName column name, value of which need to be set
+   * @param conditionColumnName column name for the condition
+   * @return
+   */
+  @Override
+  public String getColumnUpdateStatementWhereColumnIsNull(String tableName, 
String setColumnName, String conditionColumnName){
+    return "UPDATE " + tableName + " SET " + setColumnName + "=? WHERE " + 
conditionColumnName + " IS NULL;";
+  }
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java
index 4336624..91905e4 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/helpers/dbms/H2Helper.java
@@ -71,4 +71,14 @@ public class H2Helper extends GenericDbmsHelper {
       .append(" WHERE C.TABLEID = T.TABLEID AND T.TABLENAME = 
'").append(tableName).append("'");
     return statement.toString();
   }
+
+  /**
+   {@inheritDoc}
+   */
+  @Override
+  public String getCopyColumnToAnotherTableStatement(String sourceTable, 
String sourceColumnName,
+                                                     String 
sourceIDColumnName, String targetTable, String targetColumnName, String 
targetIDColumnName) {
+    return String.format("UPDATE %1$s a SET %3$s = (SELECT b.%4$s FROM %2$s b 
WHERE b.%6$s = a.%5$s LIMIT 1)",
+      targetTable, sourceTable, targetColumnName, sourceColumnName, 
targetIDColumnName, sourceIDColumnName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
index 119d9ce..07c7d3e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog251.java
@@ -166,8 +166,7 @@ public class UpgradeCatalog251 extends 
AbstractUpgradeCatalog {
     DBColumnInfo sourceColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, 
byte[].class, null, null, false);
     DBColumnInfo targetColumn = new DBColumnInfo(CLUSTER_HOST_INFO_COLUMN, 
byte[].class, null, null, false);
 
-    dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, 
REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn,
-      REQUEST_ID_COLUMN, false);
+    dbAccessor.moveColumnToAnotherTable(STAGE_TABLE, sourceColumn, 
REQUEST_ID_COLUMN, REQUEST_TABLE, targetColumn, REQUEST_ID_COLUMN, 
"{}".getBytes());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/8f06a5b0/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
index 9aaa80f..19763c4 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/DBAccessorImplTest.java
@@ -19,6 +19,8 @@
 package org.apache.ambari.server.orm;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.matchers.JUnitMatchers.containsString;
 
 import java.io.ByteArrayInputStream;
@@ -32,6 +34,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
@@ -90,6 +93,18 @@ public class DBAccessorImplTest {
     dbAccessor.createTable(tableName, columns, "id");
   }
 
+  private void createMyTable(String tableName, String...columnNames) throws 
Exception {
+    DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class);
+
+    List<DBColumnInfo> columns = new ArrayList<>();
+    columns.add(new DBColumnInfo("id", Long.class, null, null, false));
+    for (String column: columnNames){
+      columns.add(new DBColumnInfo(column, String.class, 20000, null, true));
+    }
+
+    dbAccessor.createTable(tableName, columns, "id");
+  }
+
   @Test
   public void testDbType() throws Exception {
     DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class);
@@ -577,4 +592,51 @@ public class DBAccessorImplTest {
 
     assertEquals("'foo'", columnDefaultVal);
    }
+
+  @Test
+  public void testMoveColumnToAnotherTable() throws Exception {
+    DBAccessorImpl dbAccessor = injector.getInstance(DBAccessorImpl.class);
+    String sourceTableName = getFreeTableName();
+    String targetTableName = getFreeTableName();
+    int testRowAmount = 10;
+
+    createMyTable(sourceTableName, "col1", "col2");
+    createMyTable(targetTableName, "col1");
+
+    for (Integer i=0; i < testRowAmount; i++){
+      dbAccessor.insertRow(sourceTableName,
+        new String[] {"id", "col1", "col2"},
+        new String[]{i.toString(), String.format("'source,1,%s'", i), 
String.format("'source,2,%s'", i)}, false);
+
+      dbAccessor.insertRow(targetTableName,
+        new String[] {"id", "col1"},
+        new String[]{i.toString(), String.format("'target,1,%s'", i)}, false);
+    }
+
+    DBColumnInfo sourceColumn = new DBColumnInfo("col2", String.class, null, 
null, false);
+    DBColumnInfo targetColumn = new DBColumnInfo("col2", String.class, null, 
null, false);
+
+    dbAccessor.moveColumnToAnotherTable(sourceTableName, sourceColumn, "id",
+      targetTableName, targetColumn, "id", "initial");
+
+    Statement statement = dbAccessor.getConnection().createStatement();
+    ResultSet resultSet =  statement.executeQuery("SELECT col2 FROM " + 
targetTableName + " ORDER BY col2");
+
+    assertNotNull(resultSet);
+
+    List<String> response = new LinkedList<>();
+
+    while (resultSet.next()){
+      response.add(resultSet.getString(1));
+    }
+
+    assertEquals(testRowAmount, response.toArray().length);
+
+    int i = 0;
+    for(String row: response){
+      assertEquals(String.format("source,2,%s", i), row);
+      i++;
+    }
+
+   }
 }

Reply via email to