Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2231#discussion_r157257350
  
    --- Diff: 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java
 ---
    @@ -0,0 +1,264 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.cdc.mssql;
    +
    +import org.apache.nifi.cdc.CDCException;
    +import org.apache.nifi.cdc.event.ColumnDefinition;
    +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
    +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
    +
    +import java.sql.Connection;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class MSSQLCDCUtils {
    +    private static final String _columnSplit = "\n,";
    +
    +    final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" +
    +            "  DB_NAME() AS [databaseName], \n" +
    +            "  SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS 
[schemaName], \n" +
    +            "  OBJECT_NAME(object_id) AS [tableName], \n" +
    +            "  SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId')) 
AS [sourceSchemaName],\n" +
    +            "  OBJECT_NAME(source_object_id) AS [sourceTableName] \n" +
    +            "FROM [cdc].[change_tables]";
    +
    +    final String LIST_TABLE_COLUMNS = "select cc.object_id\n" +
    +            ",cc.column_name\n" +
    +            ",cc.column_id\n" +
    +            ",cc.column_type\n" +
    +            ",cc.column_ordinal\n" +
    +            ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" +
    +            "FROM cdc.captured_columns cc\n" +
    +            "LEFT OUTER JOIN cdc.index_columns ic ON \n" +
    +            "(ic.object_id = cc.object_id AND ic.column_name = 
cc.column_name)\n" +
    +            "where cc.object_id=?\n" +
    +            "ORDER BY cc.column_ordinal";
    +
    +    public String getLIST_CHANGE_TRACKING_TABLES_SQL(){
    +        return LIST_CHANGE_TRACKING_TABLES_SQL;
    +    }
    +
    +    public String getLIST_TABLE_COLUMNS(){
    +        return LIST_TABLE_COLUMNS;
    +    }
    +
    +    public String getCURRENT_TIMESTAMP(){
    +        return "CURRENT_TIMESTAMP";
    +    }
    +
    +    public List<MSSQLTableInfo> getCDCTableList(Connection con) throws 
SQLException, CDCException {
    +        ArrayList<MSSQLTableInfo> cdcTables = new ArrayList<>();
    +
    +        try(final Statement st = con.createStatement()){
    +            final ResultSet resultSet = 
st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL());
    +
    +            while (resultSet.next()) {
    +                int objectId = resultSet.getInt("object_id");
    +                String databaseName = resultSet.getString("databaseName");
    +                String schemaName = resultSet.getString("schemaName");
    +                String tableName = resultSet.getString("tableName");
    +                String sourceSchemaName = 
resultSet.getString("sourceSchemaName");
    +                String sourceTableName = 
resultSet.getString("sourceTableName");
    +
    +                MSSQLTableInfo ti = new MSSQLTableInfo(databaseName, 
schemaName, tableName, sourceSchemaName, sourceTableName, 
Integer.toUnsignedLong(objectId), null);
    +                cdcTables.add(ti);
    +            }
    +
    +            for (MSSQLTableInfo ti:cdcTables) {
    +                List<ColumnDefinition> tableColums = 
getCaptureColumns(con, ti.getTableId());
    +
    +                ti.setColumns(tableColums);
    +            }
    +        }
    +
    +        return cdcTables;
    +    }
    +
    +    public List<ColumnDefinition> getCaptureColumns(Connection con, long 
objectId) throws SQLException, CDCException {
    +        ArrayList<ColumnDefinition> tableColumns = new ArrayList<>();
    +        try(final PreparedStatement st = 
con.prepareStatement(getLIST_TABLE_COLUMNS())){
    +            st.setLong(1, objectId);
    +
    +            final ResultSet resultSet = st.executeQuery();
    +            while (resultSet.next()) {
    +                String columnName = resultSet.getString("column_name");
    +                int columnId = resultSet.getInt("column_id");
    +                String columnType = resultSet.getString("column_type");
    +                int columnOrdinal = resultSet.getInt("column_ordinal");
    +                int isColumnKey = resultSet.getInt("key");
    +
    +                int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType);
    +
    +                //get column list
    +                MSSQLColumnDefinition col = new 
MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1);
    +                tableColumns.add(col);
    +            }
    +        } catch (SQLException e) {
    --- End diff --
    
    You don't really need this catch block to rethrow the same exception, maybe 
cast it to CDCException or replace the catch block with an empty finally block?


---

Reply via email to