Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2231#discussion_r157260327
--- Diff:
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java
---
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mssql;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
+import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
+import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CaptureChangeMSSQLTest {
+
+ private TestRunner runner;
+ private MockCaptureChangeMSSQL processor;
+ private final static String DB_LOCATION = "target/db_qdt";
+
+
+ @BeforeClass
+ public static void setupBeforeClass() throws IOException, SQLException
{
+ System.setProperty("derby.stream.error.file", "target/derby.log");
+
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
+
+ // load CDC schema to database
+ final DBCPService dbcp = new DBCPServiceSimpleImpl();
+ final Connection con = dbcp.getConnection();
+ Statement stmt = con.createStatement();
+
+ stmt.execute("CREATE TABLE cdc.change_tables(\n" +
+ "object_id int,\n" +
+ //These four columns are computed from
object_id/source_object_id in MS SQL, but for testing we put them as strings
+ "schemaName varchar(128),\n" +
+ "tableName varchar(128),\n" +
+ "sourceSchemaName varchar(128),\n" +
+ "sourceTableName varchar(128),\n" +
+
+ "version int,\n" +
+ "capture_instance varchar(128),\n" +
+ "start_lsn int,\n" +
+ "end_lsn int,\n" +
+ "supports_net_changes BOOLEAN,\n" +
+ "has_drop_pending BOOLEAN,\n" +
+ "role_name varchar(128),\n" +
+ "index_name varchar(128),\n" +
+ "filegroup_name varchar(128),\n" +
+ "create_date TIMESTAMP,\n" +
+ "partition_switch BOOLEAN)");
+
+ stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" +
+ "start_lsn int,\n" +
+ "tran_begin_time TIMESTAMP,\n" +
+ "tran_end_time TIMESTAMP,\n" +
+ "tran_id int,\n" +
+ "tran_begin_lsn int)");
+
+ stmt.execute("CREATE TABLE cdc.index_columns(\n" +
+ "object_id int,\n" +
+ "column_name varchar(128),\n" +
+ "index_ordinal int,\n" +
+ "column_id int)");
+
+ stmt.execute("CREATE TABLE cdc.captured_columns(\n" +
+ "object_id int,\n" +
+ "column_name varchar(128),\n" +
+ "column_id int,\n" +
+ "column_type varchar(128),\n" +
+ "column_ordinal int,\n" +
+ "is_computed BOOLEAN)");
+ }
+
+ @AfterClass
+ public static void cleanUpAfterClass() throws Exception {
+ try {
+ DriverManager.getConnection("jdbc:derby:" + DB_LOCATION +
";shutdown=true");
+ } catch (SQLNonTransientConnectionException e) {
+ // Do nothing, this is what happens at Derby shutdown
+ }
+ // remove previous test database, if any
+ final File dbLocation = new File(DB_LOCATION);
+ try {
+ FileUtils.deleteFile(dbLocation, true);
+ } catch (IOException ioe) {
+ // Do nothing, may not have existed
+ }
+ }
+
+ @Before
+ public void setup() throws InitializationException, IOException,
SQLException {
+ final DBCPService dbcp = new DBCPServiceSimpleImpl();
+ final Map<String, String> dbcpProperties = new HashMap<>();
+
+ processor = new MockCaptureChangeMSSQL();
+ runner = TestRunners.newTestRunner(processor);
+
+ runner.addControllerService("dbcp", dbcp, dbcpProperties);
+ runner.enableControllerService(dbcp);
+ runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp");
+
+ final MockRecordWriter writerService = new MockRecordWriter(null,
false);
+ runner.addControllerService("writer", writerService);
+ runner.enableControllerService(writerService);
+ runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer");
+
+ runner.getStateManager().clear(Scope.CLUSTER);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ runner.getStateManager().clear(Scope.CLUSTER);
+ runner = null;
+ }
+
+ @Test
+ public void testSelectGenerator(){
+ MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();
+
+ List<ColumnDefinition> columns = new ArrayList<>();
+ columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1,
true));
+ columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName",
2, false));
+ columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName",
3, false));
+
+ MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names",
+ "dbo", "dbo_Names_CT", 1000L, columns);
+
+ String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null);
+
+ Assert.assertEquals("SELECT t.tran_begin_time\n" +
+ ",t.tran_end_time \"tran_end_time\"\n" +
+ ",CAST(t.tran_id AS bigint) trans_id\n" +
+ ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" +
+ ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" +
+ ",\"o\".\"__$operation\" operation\n" +
+ ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" +
+ ",\"o\".\"ID\"\n" +
+ ",\"o\".\"LastName\"\n" +
+ ",\"o\".\"FirstName\"\n" +
+ ",CURRENT_TIMESTAMP EXTRACT_TIME\n" +
+ "FROM cdc.\"Names\" \"o\"\n" +
+ "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn =
\"o\".\"__$start_lsn\")\n" +
+ "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint),
\"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime);
+
+ String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new
Timestamp(0));
+
+ Assert.assertEquals("SELECT t.tran_begin_time\n" +
+ ",t.tran_end_time \"tran_end_time\"\n" +
+ ",CAST(t.tran_id AS bigint) trans_id\n" +
+ ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" +
+ ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" +
+ ",\"o\".\"__$operation\" operation\n" +
+ ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" +
+ ",\"o\".\"ID\"\n" +
+ ",\"o\".\"LastName\"\n" +
+ ",\"o\".\"FirstName\"\n" +
+ ",CURRENT_TIMESTAMP EXTRACT_TIME\n" +
+ "FROM cdc.\"Names\" \"o\"\n" +
+ "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn =
\"o\".\"__$start_lsn\")\n" +
+ "WHERE t.tran_end_time > ?\n" +
+ "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint),
\"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime);
+ }
+
+ @Test
+ public void testRetrieveAllChanges() throws SQLException, IOException {
+ setupNamesTable();
+
--- End diff --
Line 215 is not blank, so it throws a CheckStyle violation
---