Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2231#discussion_r157257350
--- Diff:
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/MSSQLCDCUtils.java
---
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mssql;
+
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
+import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
+
+import java.sql.Connection;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MSSQLCDCUtils {
+ private static final String _columnSplit = "\n,";
+
+ final String LIST_CHANGE_TRACKING_TABLES_SQL = "SELECT object_id,\n" +
+ " DB_NAME() AS [databaseName], \n" +
+ " SCHEMA_NAME(OBJECTPROPERTY(object_id, 'SchemaId')) AS
[schemaName], \n" +
+ " OBJECT_NAME(object_id) AS [tableName], \n" +
+ " SCHEMA_NAME(OBJECTPROPERTY(source_object_id, 'SchemaId'))
AS [sourceSchemaName],\n" +
+ " OBJECT_NAME(source_object_id) AS [sourceTableName] \n" +
+ "FROM [cdc].[change_tables]";
+
+ final String LIST_TABLE_COLUMNS = "select cc.object_id\n" +
+ ",cc.column_name\n" +
+ ",cc.column_id\n" +
+ ",cc.column_type\n" +
+ ",cc.column_ordinal\n" +
+ ",CASE WHEN ic.object_id IS NULL THEN 0 ELSE 1 END \"key\"\n" +
+ "FROM cdc.captured_columns cc\n" +
+ "LEFT OUTER JOIN cdc.index_columns ic ON \n" +
+ "(ic.object_id = cc.object_id AND ic.column_name =
cc.column_name)\n" +
+ "where cc.object_id=?\n" +
+ "ORDER BY cc.column_ordinal";
+
+ public String getLIST_CHANGE_TRACKING_TABLES_SQL(){
+ return LIST_CHANGE_TRACKING_TABLES_SQL;
+ }
+
+ public String getLIST_TABLE_COLUMNS(){
+ return LIST_TABLE_COLUMNS;
+ }
+
+ public String getCURRENT_TIMESTAMP(){
+ return "CURRENT_TIMESTAMP";
+ }
+
+ public List<MSSQLTableInfo> getCDCTableList(Connection con) throws
SQLException, CDCException {
+ ArrayList<MSSQLTableInfo> cdcTables = new ArrayList<>();
+
+ try(final Statement st = con.createStatement()){
+ final ResultSet resultSet =
st.executeQuery(getLIST_CHANGE_TRACKING_TABLES_SQL());
+
+ while (resultSet.next()) {
+ int objectId = resultSet.getInt("object_id");
+ String databaseName = resultSet.getString("databaseName");
+ String schemaName = resultSet.getString("schemaName");
+ String tableName = resultSet.getString("tableName");
+ String sourceSchemaName =
resultSet.getString("sourceSchemaName");
+ String sourceTableName =
resultSet.getString("sourceTableName");
+
+ MSSQLTableInfo ti = new MSSQLTableInfo(databaseName,
schemaName, tableName, sourceSchemaName, sourceTableName,
Integer.toUnsignedLong(objectId), null);
+ cdcTables.add(ti);
+ }
+
+ for (MSSQLTableInfo ti:cdcTables) {
+ List<ColumnDefinition> tableColums =
getCaptureColumns(con, ti.getTableId());
+
+ ti.setColumns(tableColums);
+ }
+ }
+
+ return cdcTables;
+ }
+
+ public List<ColumnDefinition> getCaptureColumns(Connection con, long
objectId) throws SQLException, CDCException {
+ ArrayList<ColumnDefinition> tableColumns = new ArrayList<>();
+ try(final PreparedStatement st =
con.prepareStatement(getLIST_TABLE_COLUMNS())){
+ st.setLong(1, objectId);
+
+ final ResultSet resultSet = st.executeQuery();
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("column_name");
+ int columnId = resultSet.getInt("column_id");
+ String columnType = resultSet.getString("column_type");
+ int columnOrdinal = resultSet.getInt("column_ordinal");
+ int isColumnKey = resultSet.getInt("key");
+
+ int jdbcType = TranslateMSSQLTypeToJDBCTypes(columnType);
+
+ //get column list
+ MSSQLColumnDefinition col = new
MSSQLColumnDefinition(jdbcType, columnName, columnOrdinal, isColumnKey==1);
+ tableColumns.add(col);
+ }
+ } catch (SQLException e) {
--- End diff --
You don't really need this catch block to rethrow the same exception, maybe
cast it to CDCException or replace the catch block with an empty finally block?
---