gerdansantos commented on a change in pull request #4065: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication. URL: https://github.com/apache/nifi/pull/4065#discussion_r383634258
########## File path: nifi-nar-bundles/nifi-cdc/nifi-cdc-postgresql-bundle/nifi-cdc-postgresql-processors/src/main/java/org/apache/nifi/cdc/postgresql/processors/CaptureChangePostgreSQL.java ########## @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cdc.postgresql.processors; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.cdc.CDCException; +import org.apache.nifi.cdc.postgresql.pgEasyReplication.ConnectionManager; +import org.apache.nifi.cdc.postgresql.pgEasyReplication.PGEasyReplication; +import org.apache.nifi.cdc.postgresql.pgEasyReplication.Event; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.file.classloader.ClassLoaderUtils; + +/** + * A processor to retrieve Change Data Capture (CDC) events and send them as flow files. + */ +@TriggerSerially +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({ "sql", "jdbc", "cdc", "postgresql" }) +@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a PostgreSQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " + + "are output as individual flow files ordered by the time at which the operation occurred. This processor use a replication connection to stream data and sql connection to snapshot.") +@Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such " + + "that it can continue from the same location if restarted.") +@WritesAttributes({ + @WritesAttribute(attribute = "last.lsn.received", description = "A Log Sequence Number (i.e. strictly increasing integer value) specifying the order " + + "of the CDC event flow file relative to the other event flow file(s)."), + @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to " + "application/json") }) + +public class CaptureChangePostgreSQL extends AbstractProcessor { + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successfully created FlowFile from CDC event.").build(); + // Properties + public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("cdc-postgresql-host") + .displayName("PostgreSQL Host") + .description("A list of hostname/port entries corresponding to nodes in a PostgreSQL cluster. The entries should be comma separated " + + "using a colon such as host1:port,host2:port,.... For example postgresql.myhost.com:5432. This processor will attempt to connect to " + + "the hosts in the list in order. If one node goes down and failover is enabled for the cluster, then the processor will connect " + + "to the active node (assuming its host entry is specified in this property. The default port for PostgreSQL connections is 5432.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DRIVER_NAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-driver-class") + .displayName("PostgreSQL Driver Class Name") + .description("The class name of the PostgreSQL database driver class") + .defaultValue("org.postgresql.Driver") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DRIVER_LOCATION = new PropertyDescriptor.Builder() + .name("cdc-postgresql-driver-locations") + .displayName("PostgreSQL Driver Location(s)") + .description("Comma-separated list of files/folders and/or URLs containing the PostgreSQL driver JAR and its dependencies (if any)." + + "For example '/var/tmp/postgresql-42.2.9.jar'") + .defaultValue(null) + .required(false) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator())) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-database") + .displayName("Database") + .description("Specifies the name of the database to connect to.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-user") + .displayName("Username") + .description("Username to access PostgreSQL cluster.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("cdc-postgresql-password") + .displayName("Password") + .description("Password to access PostgreSQL cluster.") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PUBLICATION = new PropertyDescriptor.Builder() + .name("cdc-postgresql-publication") + .displayName("Publication") + .description("PostgreSQL publication name. A publication is essentially a group of tables whose data changes are intended to be replicated through logical replication.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor SLOT_NAME = new PropertyDescriptor.Builder() + .name("cdc-postgresql-slot-name") + .displayName("Slot Name") + .description("A unique, cluster-wide identifier for the replication slot") + .required(true) Review comment: We really need this information. This will guarantee to PostgreSQL Server-side retain data at the replication level (WAL logs), e.g., if you stop this processor, and start again after some time, this is will guarantee the data at replication level will not be recycled. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
