[ https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353235#comment-16353235 ]
ASF GitHub Bot commented on NIFI-4521: -------------------------------------- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/2231#discussion_r166171052 --- Diff: nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java --- @@ -0,0 +1,792 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mssql; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition; +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo; +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.file.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaptureChangeMSSQLTest { + + private TestRunner runner; + private MockCaptureChangeMSSQL processor; + private final static String DB_LOCATION = "target/db_qdt"; + + + @BeforeClass + public static void setupBeforeClass() throws IOException, SQLException { + System.setProperty("derby.stream.error.file", "target/derby.log"); + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + + // load CDC schema to database + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Connection con = dbcp.getConnection(); + Statement stmt = con.createStatement(); + + stmt.execute("CREATE TABLE cdc.change_tables(\n" + + "object_id int,\n" + + //These four columns are computed from object_id/source_object_id in MS SQL, but for testing we put them as strings + "schemaName varchar(128),\n" + + "tableName varchar(128),\n" + + "sourceSchemaName varchar(128),\n" + + "sourceTableName varchar(128),\n" + + + "version int,\n" + + "capture_instance varchar(128),\n" + + "start_lsn int,\n" + + "end_lsn int,\n" + + "supports_net_changes BOOLEAN,\n" + + "has_drop_pending BOOLEAN,\n" + + "role_name varchar(128),\n" + + "index_name varchar(128),\n" + + "filegroup_name varchar(128),\n" + + "create_date TIMESTAMP,\n" + + "partition_switch BOOLEAN)"); + + stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" + + "start_lsn int,\n" + + "tran_begin_time TIMESTAMP,\n" + + "tran_end_time TIMESTAMP,\n" + + "tran_id int,\n" + + "tran_begin_lsn int)"); + + stmt.execute("CREATE TABLE cdc.index_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "index_ordinal int,\n" + + "column_id int)"); + + stmt.execute("CREATE TABLE cdc.captured_columns(\n" + + "object_id int,\n" + + "column_name varchar(128),\n" + + "column_id int,\n" + + "column_type varchar(128),\n" + + "column_ordinal int,\n" + + "is_computed BOOLEAN)"); + } + + @AfterClass + public static void cleanUpAfterClass() throws Exception { + try { + DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true"); + } catch (SQLNonTransientConnectionException e) { + // Do nothing, this is what happens at Derby shutdown + } + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + try { + FileUtils.deleteFile(dbLocation, true); + } catch (IOException ioe) { + // Do nothing, may not have existed + } + } + + @Before + public void setup() throws InitializationException, IOException, SQLException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + + processor = new MockCaptureChangeMSSQL(); + runner = TestRunners.newTestRunner(processor); + + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp"); + + final MockRecordWriter writerService = new MockRecordWriter(null, false); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer"); + + runner.getStateManager().clear(Scope.CLUSTER); + } + + @After + public void teardown() throws IOException { + runner.getStateManager().clear(Scope.CLUSTER); + runner = null; + } + + @Test + public void testSelectGenerator(){ + MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils(); + + List<ColumnDefinition> columns = new ArrayList<>(); + columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, true)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 2, false)); + columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 3, false)); + + MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names", + "dbo", "dbo_Names_CT", 1000L, columns); + + String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime); + + String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new Timestamp(0)); + + Assert.assertEquals("SELECT t.tran_begin_time\n" + + ",t.tran_end_time \"tran_end_time\"\n" + + ",CAST(t.tran_id AS bigint) trans_id\n" + + ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" + + ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" + + ",\"o\".\"__$operation\" operation\n" + + ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" + + ",\"o\".\"ID\"\n" + + ",\"o\".\"LastName\"\n" + + ",\"o\".\"FirstName\"\n" + + ",CURRENT_TIMESTAMP EXTRACT_TIME\n" + + "FROM cdc.\"Names\" \"o\"\n" + + "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = \"o\".\"__$start_lsn\")\n" + + "WHERE t.tran_end_time > ?\n" + + "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), \"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime); + } + + @Test + public void testRetrieveAllChanges() throws SQLException, IOException { + setupNamesTable(); + --- End diff -- I've never been able to get CheckStyle to run on my unit tests. It's like IntelliJ doesn't recognize them as valid targets for the plugin. Thanks. > MS SQL CDC Processor > -------------------- > > Key: NIFI-4521 > URL: https://issues.apache.org/jira/browse/NIFI-4521 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Reporter: Peter Wicks > Assignee: Peter Wicks > Priority: Major > > Creation of a new processor that reads Change Data Capture details from > Microsoft SQL Server and outputs the changes a Records. -- This message was sent by Atlassian JIRA (v7.6.3#76005)