tpalfy commented on a change in pull request #3306: nifi oracle cdc changes using xstream URL: https://github.com/apache/nifi/pull/3306#discussion_r336575559
########## File path: nifi-nar-bundles/nifi-oraclecdc-bundle/nifi-oraclecdc-processors/src/main/java/org/apache/nifi/processors/oraclecdc/controller/impl/StandardOracleCDCService.java ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.oraclecdc.controller.impl; + +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.codec.binary.Base32; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.oraclecdc.utils.LCRCallBackHandler; +import org.apache.nifi.reporting.InitializationException; +import org.nifi.oraclecdcservice.api.OracleCDCEventHandler; +import org.nifi.oraclecdcservice.api.OracleCDCService; +import org.nifi.oraclecdcservice.api.OracleClassLoaderService; + +@Tags({ "dbcp", "jdbc", "database", "cdc", "oracle", "store" }) +@CapabilityDescription("Provides Database Connection Pooling Service for oracle CDC connection." + + " Needs OCI native connection. Add the location to the native libraries using -Djava.library.path" + + "to the jvm argurmets in nifi bootstrap.conf") + +public class StandardOracleCDCService extends AbstractControllerService implements OracleCDCService { + + public static final PropertyDescriptor DB_HOST = new PropertyDescriptor.Builder().name("DB_HOST") + .displayName("source oracle db port").description("localhost").required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor DB_PORT = new PropertyDescriptor.Builder().name("DB_PORT") + .displayName("source oracle db port").description("1521").required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor DB_SID = new PropertyDescriptor.Builder().name("DB_SID") + .displayName("source oracle SID ").description("source oracle SID ").sensitive(true).required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor DB_USER = new PropertyDescriptor.Builder().name("DB_USER") + .displayName("source oracle xstream capture user").description("source oracle xstream capture user") + .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor DB_PASS = new PropertyDescriptor.Builder().name("DB_PASS") + .displayName("source oracle xstream capture user password") + .description("source oracle xstream capture user password").sensitive(true).required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); + + public static final PropertyDescriptor DB_CLASS_LOADER = new PropertyDescriptor.Builder() + .name("database-driver-class loader service").displayName("Class loader service") + .description( + "create 1 per oracle db version, this is to ensure that native libraries are not reloaded per processor or CS") + .identifiesControllerService(OracleClassLoaderService.class).required(true).build(); + + public static final PropertyDescriptor MAX_TOTAL_CONNECTIONS = new PropertyDescriptor.Builder() + .name("Max Total Connections") + .description( + "The maximum number of active connections that can be allocated from this pool at the same time, " + + " or negative for no limit.") + .defaultValue("8").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).sensitive(false) + .build(); + + public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() + .displayName("Minimum Idle Connections").name("dbcp-min-idle-conns") + .description("The minimum number of connections that can remain idle in the pool, without extra ones being " + + "created, or zero to create none.") + .defaultValue("0").required(false).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build(); + + public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() + .displayName("Max Idle Connections").name("dbcp-max-idle-conns") + .description("The maximum number of connections that can remain idle in the pool, without extra ones being " + + "released, or negative for no limit.") + .defaultValue("8").required(false).addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build(); + + private static final List<PropertyDescriptor> properties; + + protected ClassLoader driverClassLoader; + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(DB_HOST); + props.add(DB_PORT); + props.add(DB_USER); + props.add(DB_PASS); + props.add(DB_SID); + props.add(DB_CLASS_LOADER); + props.add(MAX_TOTAL_CONNECTIONS); + props.add(MAX_IDLE); + props.add(MIN_IDLE); + + properties = Collections.unmodifiableList(props); + } + + private volatile BasicDataSource dataSource; + // private volatile String dbUrl; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder().name(propertyDescriptorName).required(false) + .addValidator(StandardValidators + .createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).dynamic(true).build(); + } + + /** + * @param context + * the configuration context + * @throws InitializationException + * if unable to create a database connection + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + final String host = context.getProperty(DB_HOST).getValue(); + final String port = context.getProperty(DB_PORT).getValue(); + final String user = context.getProperty(DB_USER).getValue(); + final String passw = context.getProperty(DB_PASS).getValue(); + final String dbSid = context.getProperty(DB_SID).getValue(); + final OracleClassLoaderService clService = context.getProperty(DB_CLASS_LOADER) + .asControllerService(OracleClassLoaderService.class); + final String drv = "oracle.jdbc.OracleDriver"; + final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger(); + final Integer maxIdle = context.getProperty(MAX_IDLE).asInteger(); + final Integer miIdle = context.getProperty(MIN_IDLE).asInteger(); + dataSource = new BasicDataSource(); + dataSource.setAccessToUnderlyingConnectionAllowed(true); + dataSource.setDriverClassName(drv); + + // Optional driver URL, when exist, this URL will be used to locate + // driver jar file location + dataSource.setDriverClassLoader(clService.getClassLoader()); + this.driverClassLoader = clService.getClassLoader(); + final String dburl = "jdbc:oracle:oci:@" + host + ":" + port + ":" + dbSid; + // dataSource.setMaxWaitMillis(maxWaitMillis); + dataSource.setMaxTotal(maxTotal); + dataSource.setMinIdle(miIdle); + dataSource.setMaxIdle(maxTotal); + // dataSource.setMaxConnLifetimeMillis(maxConnLifetimeMillis); + // dataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); + // dataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); + // dataSource.setSoftMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis); + + dataSource.setUrl(dburl); + dataSource.setUsername(user); + dataSource.setPassword(passw); + + context.getProperties().keySet().stream().filter(PropertyDescriptor::isDynamic) + .forEach((dynamicPropDescriptor) -> dataSource.addConnectionProperty(dynamicPropDescriptor.getName(), + context.getProperty(dynamicPropDescriptor).evaluateAttributeExpressions().getValue())); + + } + + @Override + public void receiveEvents(Object xsOut, OracleCDCEventHandler handler) { + if (null == xsOut) { + getLogger().info("xstreamOut is null"); + return; + } + + try { + LCRCallBackHandler hdlr = new LCRCallBackHandler(this.driverClassLoader, handler); + Object proxy = Proxy.newProxyInstance(this.driverClassLoader, + new Class[] { loadClass("oracle.streams.XStreamLCRCallbackHandler") }, hdlr); + Class<?> xstreamOut = loadClass("oracle.streams.XStreamOut"); + Method method = xstreamOut.getMethod("receiveLCRCallback", + loadClass("oracle.streams.XStreamLCRCallbackHandler"), int.class); + method.invoke(xsOut, proxy, + loadClass("oracle.streams.XStreamOut").getDeclaredField("DEFAULT_MODE").getInt(null)); + System.out.print("done"); + } catch (Exception e) { + e.printStackTrace(); + getLogger().warn("exception when processing LCRs"); + getLogger().warn(e.getMessage()); + throw new ProcessException("exception when processing LCRs " + e.getMessage()); + } + } + + @Override + public void setProcessedLowWaterMark(Object xsOutServer, byte[] position) { + try { + System.out.println(new String(new Base32(true).encode(position))); + Class<?> xstreamOut = loadClass("oracle.streams.XStreamOut"); + Method method = xstreamOut.getMethod("setProcessedLowWatermark", byte[].class, int.class); + method.invoke(xsOutServer, position, + loadClass("oracle.streams.XStreamOut").getDeclaredField("DEFAULT_MODE").getInt(null)); + } catch (Exception e) { + e.printStackTrace(); + getLogger().warn("exception when set low watermark"); + getLogger().warn(e.getMessage()); + throw new ProcessException("exception when setting low water mark " + e.getMessage()); + } + } + + @Override + public Object attach(String xsOutName, byte[] lastPosition) { + getLogger().info("in attach"); Review comment: This should probably be debug or trace level log (it it's still needed.) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services