[ 
https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353235#comment-16353235
 ] 

ASF GitHub Bot commented on NIFI-4521:
--------------------------------------

Github user patricker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2231#discussion_r166171052
  
    --- Diff: 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/test/java/org/apache/nifi/cdc/mssql/CaptureChangeMSSQLTest.java
 ---
    @@ -0,0 +1,792 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.cdc.mssql;
    +
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.cdc.event.ColumnDefinition;
    +import org.apache.nifi.cdc.mssql.event.MSSQLColumnDefinition;
    +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
    +import org.apache.nifi.cdc.mssql.processors.CaptureChangeMSSQL;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.record.MockRecordWriter;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.apache.nifi.util.file.FileUtils;
    +import org.junit.After;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DriverManager;
    +import java.sql.SQLException;
    +import java.sql.SQLNonTransientConnectionException;
    +import java.sql.Statement;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +public class CaptureChangeMSSQLTest {
    +
    +    private TestRunner runner;
    +    private MockCaptureChangeMSSQL processor;
    +    private final static String DB_LOCATION = "target/db_qdt";
    +
    +
    +    @BeforeClass
    +    public static void setupBeforeClass() throws IOException, SQLException 
{
    +        System.setProperty("derby.stream.error.file", "target/derby.log");
    +
    +        // remove previous test database, if any
    +        final File dbLocation = new File(DB_LOCATION);
    +        try {
    +            FileUtils.deleteFile(dbLocation, true);
    +        } catch (IOException ioe) {
    +            // Do nothing, may not have existed
    +        }
    +
    +        // load CDC schema to database
    +        final DBCPService dbcp = new DBCPServiceSimpleImpl();
    +        final Connection con = dbcp.getConnection();
    +        Statement stmt = con.createStatement();
    +
    +        stmt.execute("CREATE TABLE cdc.change_tables(\n" +
    +                "object_id int,\n" +
    +                //These four columns are computed from 
object_id/source_object_id in MS SQL, but for testing we put them as strings
    +                "schemaName varchar(128),\n" +
    +                "tableName varchar(128),\n" +
    +                "sourceSchemaName varchar(128),\n" +
    +                "sourceTableName varchar(128),\n" +
    +
    +                "version int,\n" +
    +                "capture_instance varchar(128),\n" +
    +                "start_lsn int,\n" +
    +                "end_lsn int,\n" +
    +                "supports_net_changes BOOLEAN,\n" +
    +                "has_drop_pending BOOLEAN,\n" +
    +                "role_name varchar(128),\n" +
    +                "index_name varchar(128),\n" +
    +                "filegroup_name varchar(128),\n" +
    +                "create_date TIMESTAMP,\n" +
    +                "partition_switch BOOLEAN)");
    +
    +        stmt.execute("CREATE TABLE cdc.lsn_time_mapping(\n" +
    +                "start_lsn int,\n" +
    +                "tran_begin_time TIMESTAMP,\n" +
    +                "tran_end_time TIMESTAMP,\n" +
    +                "tran_id int,\n" +
    +                "tran_begin_lsn int)");
    +
    +        stmt.execute("CREATE TABLE cdc.index_columns(\n" +
    +                "object_id int,\n" +
    +                "column_name varchar(128),\n" +
    +                "index_ordinal int,\n" +
    +                "column_id int)");
    +
    +        stmt.execute("CREATE TABLE cdc.captured_columns(\n" +
    +                "object_id int,\n" +
    +                "column_name varchar(128),\n" +
    +                "column_id int,\n" +
    +                "column_type varchar(128),\n" +
    +                "column_ordinal int,\n" +
    +                "is_computed BOOLEAN)");
    +    }
    +
    +    @AfterClass
    +    public static void cleanUpAfterClass() throws Exception {
    +        try {
    +            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + 
";shutdown=true");
    +        } catch (SQLNonTransientConnectionException e) {
    +            // Do nothing, this is what happens at Derby shutdown
    +        }
    +        // remove previous test database, if any
    +        final File dbLocation = new File(DB_LOCATION);
    +        try {
    +            FileUtils.deleteFile(dbLocation, true);
    +        } catch (IOException ioe) {
    +            // Do nothing, may not have existed
    +        }
    +    }
    +
    +    @Before
    +    public void setup() throws InitializationException, IOException, 
SQLException {
    +        final DBCPService dbcp = new DBCPServiceSimpleImpl();
    +        final Map<String, String> dbcpProperties = new HashMap<>();
    +
    +        processor = new MockCaptureChangeMSSQL();
    +        runner = TestRunners.newTestRunner(processor);
    +
    +        runner.addControllerService("dbcp", dbcp, dbcpProperties);
    +        runner.enableControllerService(dbcp);
    +        runner.setProperty(CaptureChangeMSSQL.DBCP_SERVICE, "dbcp");
    +
    +        final MockRecordWriter writerService = new MockRecordWriter(null, 
false);
    +        runner.addControllerService("writer", writerService);
    +        runner.enableControllerService(writerService);
    +        runner.setProperty(CaptureChangeMSSQL.RECORD_WRITER, "writer");
    +
    +        runner.getStateManager().clear(Scope.CLUSTER);
    +    }
    +
    +    @After
    +    public void teardown() throws IOException {
    +        runner.getStateManager().clear(Scope.CLUSTER);
    +        runner = null;
    +    }
    +
    +    @Test
    +    public void testSelectGenerator(){
    +        MSSQLCDCUtils mssqlcdcUtils = new MSSQLCDCUtils();
    +
    +        List<ColumnDefinition> columns = new ArrayList<>();
    +        columns.add(new MSSQLColumnDefinition(Types.INTEGER, "ID", 1, 
true));
    +        columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "LastName", 
2, false));
    +        columns.add(new MSSQLColumnDefinition(Types.VARCHAR, "FirstName", 
3, false));
    +
    +        MSSQLTableInfo ti = new MSSQLTableInfo("NiFi", "cdc", "Names",
    +                "dbo", "dbo_Names_CT", 1000L, columns);
    +
    +        String noMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, null);
    +
    +        Assert.assertEquals("SELECT t.tran_begin_time\n" +
    +                ",t.tran_end_time \"tran_end_time\"\n" +
    +                ",CAST(t.tran_id AS bigint) trans_id\n" +
    +                ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" +
    +                ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" +
    +                ",\"o\".\"__$operation\" operation\n" +
    +                ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" +
    +                ",\"o\".\"ID\"\n" +
    +                ",\"o\".\"LastName\"\n" +
    +                ",\"o\".\"FirstName\"\n" +
    +                ",CURRENT_TIMESTAMP EXTRACT_TIME\n" +
    +                "FROM cdc.\"Names\" \"o\"\n" +
    +                "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = 
\"o\".\"__$start_lsn\")\n" +
    +                "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), 
\"o\".\"__$seqval\", \"o\".\"__$operation\"", noMaxTime);
    +
    +        String withMaxTime = mssqlcdcUtils.getCDCSelectStatement(ti, new 
Timestamp(0));
    +
    +        Assert.assertEquals("SELECT t.tran_begin_time\n" +
    +                ",t.tran_end_time \"tran_end_time\"\n" +
    +                ",CAST(t.tran_id AS bigint) trans_id\n" +
    +                ",CAST(\"o\".\"__$start_lsn\" AS bigint) start_lsn\n" +
    +                ",CAST(\"o\".\"__$seqval\" AS bigint) seqval\n" +
    +                ",\"o\".\"__$operation\" operation\n" +
    +                ",CAST(\"o\".\"__$update_mask\" AS bigint) update_mask\n" +
    +                ",\"o\".\"ID\"\n" +
    +                ",\"o\".\"LastName\"\n" +
    +                ",\"o\".\"FirstName\"\n" +
    +                ",CURRENT_TIMESTAMP EXTRACT_TIME\n" +
    +                "FROM cdc.\"Names\" \"o\"\n" +
    +                "INNER JOIN cdc.lsn_time_mapping t ON (t.start_lsn = 
\"o\".\"__$start_lsn\")\n" +
    +                "WHERE t.tran_end_time > ?\n" +
    +                "ORDER BY CAST(\"o\".\"__$start_lsn\" AS bigint), 
\"o\".\"__$seqval\", \"o\".\"__$operation\"", withMaxTime);
    +    }
    +
    +    @Test
    +    public void testRetrieveAllChanges() throws SQLException, IOException {
    +        setupNamesTable();
    +        
    --- End diff --
    
    I've never been able to get CheckStyle to run on my unit tests. It's like 
IntelliJ doesn't recognize them as valid targets for the plugin. Thanks.


> MS SQL CDC Processor
> --------------------
>
>                 Key: NIFI-4521
>                 URL: https://issues.apache.org/jira/browse/NIFI-4521
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Peter Wicks
>            Assignee: Peter Wicks
>            Priority: Major
>
> Creation of a new processor that reads Change Data Capture details from 
> Microsoft SQL Server and outputs the changes a Records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to