[ 
https://issues.apache.org/jira/browse/NIFI-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353231#comment-16353231
 ] 

ASF GitHub Bot commented on NIFI-4521:
--------------------------------------

Github user patricker commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2231#discussion_r166170695
  
    --- Diff: 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mssql-bundle/nifi-cdc-mssql-processors/src/main/java/org/apache/nifi/cdc/mssql/processors/CaptureChangeMSSQL.java
 ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.cdc.mssql.processors;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.cdc.CDCException;
    +import org.apache.nifi.cdc.mssql.MSSQLCDCUtils;
    +import org.apache.nifi.cdc.mssql.event.MSSQLTableInfo;
    +import org.apache.nifi.cdc.mssql.event.TableCapturePlan;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.expression.AttributeExpression;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.ResultSetRecordSet;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.sql.Connection;
    +import java.sql.PreparedStatement;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "jdbc", "cdc", "mssql"})
    +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a 
Microsoft SQL database. CDC Events include INSERT, UPDATE, DELETE operations. 
Events "
    +        + "for each table are output as Record Sets, ordered by the time, 
and sequence, at which the operation occurred.")
    +@Stateful(scopes = Scope.CLUSTER, description = "Information including the 
timestamp of the last CDC event per table in the database is stored by this 
processor, so "
    +        + "that it can continue from the same point in time if restarted.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "tablename", description="Name of the 
table this changeset was captured from."),
    +        @WritesAttribute(attribute="mssqlcdc.row.count", description="The 
number of rows in this changeset"),
    +        @WritesAttribute(attribute="fullsnapshot", description="Whether 
this was a full snapshot of the base table or not..")})
    +@DynamicProperty(name = "Initial Timestamp", value = "Attribute Expression 
Language", supportsExpressionLanguage = false, description = "Specifies an 
initial "
    +        + "timestamp for reading CDC data from MS SQL. Properties should 
be added in the format `initial.timestamp.{table_name}`, one for each table. "
    +        + "This property is ignored after the first successful run for a 
table writes to the state manager, and is only used again if state is cleared.")
    +public class CaptureChangeMSSQL extends AbstractSessionFactoryProcessor {
    +    public static final String INITIAL_TIMESTAMP_PROP_START = 
"initial.timestamp.";
    +
    +    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
    +            .name("record-writer")
    +            .displayName("Record Writer")
    +            .description("Specifies the Controller Service to use for 
writing out the records")
    +            .identifiesControllerService(RecordSetWriterFactory.class)
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-dbcp-service")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain 
connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CDC_TABLES = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-cdc-table-list")
    +            .displayName("CDC Table List")
    +            .description("The comma delimited list of tables in the source 
database to monitor for changes. If no tables "
    +                    + "are specified the [cdc].[change_tables] table is 
queried for all of the available tables with change tracking enabled in the 
database.")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TAKE_INITIAL_SNAPSHOT = new 
PropertyDescriptor.Builder()
    +            .name("cdcmssql-initial-snapshot")
    +            .displayName("Generate an Initial Source Table Snapshot")
    +            .description("Usually CDC only includes recent historic 
changes. Setting this property to true will cause a snapshot of the "
    +                + "source table to be taken using the same schema as the 
CDC extracts. The snapshot time will be used as the starting point "
    +                + "for extracting CDC changes.")
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .required(true)
    +            .build();
    +
    +    public static final PropertyDescriptor FULL_SNAPSHOT_ROW_LIMIT = new 
PropertyDescriptor
    +            .Builder().name("cdcmssql-full-snapshot-row-limit")
    +            .displayName("Change Set Row Limit")
    +            .description("If a very large change occurs on the source 
table, "
    +                    + "the generated change set may be too large too 
quickly merge into a destination system. "
    +                    + "Use this property to set a cut-off point where 
instead of returning a changeset a full snapshot will be generated instead. "
    +                    + "The fullsnapshot attribute will be set to true when 
this happens.")
    +            .required(true)
    +            .defaultValue("0")
    +            
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query 
result set.")
    +            .build();
    +
    +    protected List<PropertyDescriptor> descriptors;
    +    protected Set<Relationship> relationships;
    +
    +    protected final Map<String, MSSQLTableInfo> schemaCache = new 
ConcurrentHashMap<String, MSSQLTableInfo>(1000);
    --- End diff --
    
    There actually isn't really a reason to have an initial value at all. I 
copied this from the revolving cache used in other processors, but removed the 
cleaning part of the code that limits the size of the cache. Since this 
processor does not allow inputs, the size of the schema cache doesn't need to 
worry about growing forever like some other processors that allow expression 
language for the table name, and an input flowfile, do.



> MS SQL CDC Processor
> --------------------
>
>                 Key: NIFI-4521
>                 URL: https://issues.apache.org/jira/browse/NIFI-4521
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Peter Wicks
>            Assignee: Peter Wicks
>            Priority: Major
>
> Creation of a new processor that reads Change Data Capture details from 
> Microsoft SQL Server and outputs the changes a Records.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to