This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ca76fe178cfae8890b347081260cd59a62321219 Author: Matthew Burgess <[email protected]> AuthorDate: Wed Feb 27 16:17:46 2019 -0500 NIFI-6082: Added DatabaseRecordLookupService, refactored common DB utils NIFI-6082: Added SimpleDatabaseLookupService NIFI-6082: Merged Koji's improvements, incorporated review comments This closes #3341. Signed-off-by: Koji Kawamura <[email protected]> --- .../serialization/record/ResultSetRecordSet.java | 18 +- .../{ => nifi-database-test-utils}/pom.xml | 26 +-- .../apache/nifi/util/db/SimpleCommerceDataSet.java | 112 ++++++++++ .../nifi-database-utils/pom.xml | 101 +++++++++ .../java/org/apache/nifi/util/db}/AvroUtil.java | 2 +- .../java/org/apache/nifi/util/db}/JdbcCommon.java | 71 +------ .../apache/nifi/util/db}/JdbcCommonTestUtils.java | 2 +- .../org/apache/nifi/util/db}/TestJdbcCommon.java | 18 +- .../nifi/util/db}/TestJdbcCommonConvertToAvro.java | 8 +- .../apache/nifi/util/db}/TestJdbcHugeStream.java | 2 +- .../apache/nifi/util/db}/TestJdbcTypesDerby.java | 2 +- .../org/apache/nifi/util/db}/TestJdbcTypesH2.java | 2 +- nifi-nar-bundles/nifi-extension-utils/pom.xml | 2 + .../nifi-standard-processors/pom.xml | 11 + .../processors/standard/AbstractExecuteSQL.java | 2 +- .../standard/AbstractQueryDatabaseTable.java | 2 +- .../nifi/processors/standard/ExecuteSQL.java | 12 +- .../nifi/processors/standard/ExecuteSQLRecord.java | 5 +- .../nifi/processors/standard/LookupRecord.java | 5 +- .../apache/nifi/processors/standard/PutSQL.java | 2 +- .../processors/standard/QueryDatabaseTable.java | 10 +- .../standard/QueryDatabaseTableRecord.java | 4 +- .../standard/sql/DefaultAvroSqlWriter.java | 12 +- .../processors/standard/sql/RecordSqlWriter.java | 14 +- .../nifi/processors/standard/sql/SqlWriter.java | 6 +- .../processors/standard/util/JdbcProperties.java | 81 ++++++++ .../nifi/processors/standard/TestExecuteSQL.java | 6 +- .../processors/standard/TestExecuteSQLRecord.java | 4 +- .../nifi-lookup-services/pom.xml | 29 ++- .../lookup/db/AbstractDatabaseLookupService.java | 104 ++++++++++ .../lookup/db/DatabaseRecordLookupService.java | 206 ++++++++++++++++++ .../lookup/db/SimpleDatabaseLookupService.java | 174 ++++++++++++++++ .../org.apache.nifi.controller.ControllerService | 2 + .../db/TestDatabaseRecordLookupService.groovy | 229 +++++++++++++++++++++ .../db/TestSimpleDatabaseLookupService.groovy | 184 +++++++++++++++++ 35 files changed, 1322 insertions(+), 148 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index ee47c63..fc3d60f 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -55,9 +55,21 @@ public class ResultSetRecordSet implements RecordSet, Closeable { private static final String FLOAT_CLASS_NAME = Float.class.getName(); public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { + this(rs, readerSchema, false); + } + + /** + * Constructs a ResultSetRecordSet with a given ResultSet and RecordSchema + * + * @param rs The underlying ResultSet for this RecordSet + * @param readerSchema The schema to which this RecordSet adheres + * @param allFieldsNullable Whether to override the database column's "nullable" metadata. If true then all fields in the RecordSet are nullable. + * @throws SQLException if an error occurs while creating the schema or reading the result set's metadata + */ + public ResultSetRecordSet(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException { this.rs = rs; moreRows = rs.next(); - this.schema = createSchema(rs, readerSchema); + this.schema = createSchema(rs, readerSchema, allFieldsNullable); rsColumnNames = new HashSet<>(); final ResultSetMetaData metadata = rs.getMetaData(); @@ -140,7 +152,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return value; } - private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema) throws SQLException { + private static RecordSchema createSchema(final ResultSet rs, final RecordSchema readerSchema, boolean allFieldsNullable) throws SQLException { final ResultSetMetaData metadata = rs.getMetaData(); final int numCols = metadata.getColumnCount(); final List<RecordField> fields = new ArrayList<>(numCols); @@ -154,7 +166,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { final int nullableFlag = metadata.isNullable(column); final boolean nullable; - if (nullableFlag == ResultSetMetaData.columnNoNulls) { + if (nullableFlag == ResultSetMetaData.columnNoNulls && !allFieldsNullable) { nullable = false; } else { nullable = true; diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml similarity index 55% copy from nifi-nar-bundles/nifi-extension-utils/pom.xml copy to nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml index b2c8f51..cf63b5e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/pom.xml @@ -1,4 +1,4 @@ -<?xml version="1.0"?> +<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with @@ -13,25 +13,19 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar-bundles</artifactId> + <artifactId>nifi-extension-utils</artifactId> <version>1.10.0-SNAPSHOT</version> </parent> - <packaging>pom</packaging> - <artifactId>nifi-extension-utils</artifactId> - <description> - This module contains reusable utilities related to extensions that can be shared across NARs. - </description> - <modules> - <module>nifi-record-utils</module> - <module>nifi-hadoop-utils</module> - <module>nifi-processor-utils</module> - <module>nifi-reporting-utils</module> - <module>nifi-syslog-utils</module> - </modules> + <artifactId>nifi-database-test-utils</artifactId> -</project> + <dependencies> + </dependencies> + +</project> \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java new file mode 100644 index 0000000..89c2600 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-test-utils/src/main/java/org/apache/nifi/util/db/SimpleCommerceDataSet.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.db; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +/** + * A sample data set for test consists of 'persons', 'products' and 'relationships' tables. + */ +public class SimpleCommerceDataSet { + + static String dropPersons = "drop table persons"; + static String dropProducts = "drop table products"; + static String dropRelationships = "drop table relationships"; + static String createPersons = "create table persons (id integer, name varchar(100), code integer)"; + static String createProducts = "create table products (id integer, name varchar(100), code integer)"; + static String createRelationships = "create table relationships (id integer,name varchar(100), code integer)"; + + public static void loadTestData2Database(Connection con, int nrOfPersons, int nrOfProducts, int nrOfRels) throws SQLException { + + System.out.println(createRandomName()); + System.out.println(createRandomName()); + System.out.println(createRandomName()); + + final Statement st = con.createStatement(); + + // tables may not exist, this is not serious problem. + try { + st.executeUpdate(dropPersons); + } catch (final Exception ignored) { + } + + try { + st.executeUpdate(dropProducts); + } catch (final Exception ignored) { + } + + try { + st.executeUpdate(dropRelationships); + } catch (final Exception ignored) { + } + + st.executeUpdate(createPersons); + st.executeUpdate(createProducts); + st.executeUpdate(createRelationships); + + for (int i = 0; i < nrOfPersons; i++) + loadPersons(st, i); + + for (int i = 0; i < nrOfProducts; i++) + loadProducts(st, i); + + for (int i = 0; i < nrOfRels; i++) + loadRelationships(st, i); + + st.close(); + } + + static Random rng = new Random(53495); + + static private void loadPersons(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into persons values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private void loadProducts(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into products values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private void loadRelationships(Statement st, int nr) throws SQLException { + st.executeUpdate("insert into relationships values (" + nr + ", '" + createRandomName() + "', " + rng.nextInt(469946) + ")"); + } + + static private String createRandomName() { + return createRandomString() + " " + createRandomString(); + } + + static private String createRandomString() { + + final int length = rng.nextInt(10); + final String characters = "ABCDEFGHIJ"; + + final char[] text = new char[length]; + for (int i = 0; i < length; i++) { + text[i] = characters.charAt(rng.nextInt(characters.length())); + } + return new String(text); + } + + private Connection createConnection(String location) throws ClassNotFoundException, SQLException { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + location + ";create=true"); + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml new file mode 100644 index 0000000..3cc62e8 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-extension-utils</artifactId> + <version>1.10.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-database-utils</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-record-utils</artifactId> + <version>1.10.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.10.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.6.2</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.8.1</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.8.1</version> + </dependency> + <!-- Other modules using nifi-database-utils are expected to have these APIs available, typically through a NAR dependency --> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.10.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.6</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.11.1.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.187</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/org/apache/nifi/avro/data.avro</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/schema.json</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/simpleSchema.json</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue1.json</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayValue2.json</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords1.json</exclude> + <exclude>src/test/resources/org/apache/nifi/avro/defaultArrayInRecords2.json</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java similarity index 97% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java index 970c7c2..8bb2261 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/AvroUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/AvroUtil.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import org.apache.avro.file.CodecFactory; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java similarity index 90% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java index 3de86c7..e41b3cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import static java.sql.Types.ARRAY; import static java.sql.Types.BIGINT; @@ -100,12 +100,9 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.avro.AvroTypeUtil; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.processor.util.StandardValidators; import javax.xml.bind.DatatypeConverter; @@ -114,11 +111,11 @@ import javax.xml.bind.DatatypeConverter; */ public class JdbcCommon { - private static final int MAX_DIGITS_IN_BIGINT = 19; - private static final int MAX_DIGITS_IN_INT = 9; + public static final int MAX_DIGITS_IN_BIGINT = 19; + public static final int MAX_DIGITS_IN_INT = 9; // Derived from MySQL default precision. - private static final int DEFAULT_PRECISION_VALUE = 10; - private static final int DEFAULT_SCALE_VALUE = 0; + public static final int DEFAULT_PRECISION_VALUE = 10; + public static final int DEFAULT_SCALE_VALUE = 0; public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$"); public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); @@ -126,62 +123,6 @@ public class JdbcCommon { public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; - public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() - .name("dbf-normalize") - .displayName("Normalize Table/Column Names") - .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " - + "will be changed to underscores in order to build a valid Avro record.") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - - public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder() - .name("dbf-user-logical-types") - .displayName("Use Avro Logical Types") - .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. " - + "If disabled, written as string. " - + "If enabled, Logical types are used and written as its underlying type, specifically, " - + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, " - + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), " - + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, " - + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. " - + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.") - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); - - public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder() - .name("dbf-default-precision") - .displayName("Default Decimal Precision") - .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," - + " a specific 'precision' denoting number of available digits is required." - + " Generally, precision is defined by column data type definition or database engines default." - + " However undefined precision (0) can be returned from some database engines." - + " 'Default Decimal Precision' is used when writing those undefined precision numbers.") - .defaultValue(String.valueOf(DEFAULT_PRECISION_VALUE)) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder() - .name("dbf-default-scale") - .displayName("Default Decimal Scale") - .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," - + " a specific 'scale' denoting number of available decimal digits is required." - + " Generally, scale is defined by column data type definition or database engines default." - + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines." - + " 'Default Decimal Scale' is used when writing those undefined numbers." - + " If a value has more decimals than specified scale, then the value will be rounded-up," - + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.") - .defaultValue(String.valueOf(DEFAULT_SCALE_VALUE)) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - public static long convertToAvroStream(final ResultSet rs, final OutputStream outStream, boolean convertNames) throws SQLException, IOException { return convertToAvroStream(rs, outStream, null, null, convertNames); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java similarity index 97% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java index ad57158..cc8d29e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/JdbcCommonTestUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/JdbcCommonTestUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java similarity index 97% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java index 9cf4fc1..fa584c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommon.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommon.java @@ -14,10 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; -import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream; -import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -427,7 +425,7 @@ public class TestJdbcCommon { when(metadata.getPrecision(1)).thenReturn(dbPrecision); when(metadata.getScale(1)).thenReturn(expectedScale); - final ResultSet rs = resultSetReturningMetadata(metadata); + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); when(rs.getObject(Mockito.anyInt())).thenReturn(bigDecimal); @@ -580,12 +578,12 @@ public class TestJdbcCommon { when(metadata.getColumnName(1)).thenReturn("t_int"); when(metadata.getTableName(1)).thenReturn("table"); - final ResultSet rs = resultSetReturningMetadata(metadata); + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); final short s = 25; when(rs.getObject(Mockito.anyInt())).thenReturn(s); - final InputStream instream = convertResultSetToAvroInputStream(rs); + final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) { @@ -608,12 +606,12 @@ public class TestJdbcCommon { when(metadata.getColumnName(1)).thenReturn(mockColumnName); when(metadata.getTableName(1)).thenReturn("table"); - final ResultSet rs = resultSetReturningMetadata(metadata); + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); final Long ret = 0L; when(rs.getObject(Mockito.anyInt())).thenReturn(ret); - final InputStream instream = convertResultSetToAvroInputStream(rs); + final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) { @@ -636,12 +634,12 @@ public class TestJdbcCommon { when(metadata.getColumnName(1)).thenReturn(mockColumnName); when(metadata.getTableName(1)).thenReturn("table"); - final ResultSet rs = resultSetReturningMetadata(metadata); + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); final Long ret = 0L; when(rs.getObject(Mockito.anyInt())).thenReturn(ret); - final InputStream instream = convertResultSetToAvroInputStream(rs); + final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java similarity index 92% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java index eb736e2..e6f9743 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcCommonConvertToAvro.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcCommonConvertToAvro.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -40,8 +40,6 @@ import static java.sql.Types.INTEGER; import static java.sql.Types.SMALLINT; import static java.sql.Types.TINYINT; import static java.sql.Types.BIGINT; -import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.convertResultSetToAvroInputStream; -import static org.apache.nifi.processors.standard.util.JdbcCommonTestUtils.resultSetReturningMetadata; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -133,12 +131,12 @@ public class TestJdbcCommonConvertToAvro { when(metadata.getColumnName(1)).thenReturn("t_int"); when(metadata.getTableName(1)).thenReturn("table"); - final ResultSet rs = resultSetReturningMetadata(metadata); + final ResultSet rs = JdbcCommonTestUtils.resultSetReturningMetadata(metadata); final int ret = 0; when(rs.getObject(Mockito.anyInt())).thenReturn(ret); - final InputStream instream = convertResultSetToAvroInputStream(rs); + final InputStream instream = JdbcCommonTestUtils.convertResultSetToAvroInputStream(rs); final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); try (final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(instream, datumReader)) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java similarity index 99% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java index 499127b..e44024a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcHugeStream.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcHugeStream.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import static org.junit.Assert.assertEquals; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java similarity index 99% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java index 2c3eb58..37af3ac 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesDerby.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesDerby.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import static org.junit.Assert.assertNotNull; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java similarity index 99% rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java index c4f6071..5f594df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TestJdbcTypesH2.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/test/java/org/apache/nifi/util/db/TestJdbcTypesH2.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.standard.util; +package org.apache.nifi.util.db; import static org.junit.Assert.assertNotNull; diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml index b2c8f51..ccec552 100644 --- a/nifi-nar-bundles/nifi-extension-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml @@ -32,6 +32,8 @@ <module>nifi-processor-utils</module> <module>nifi-reporting-utils</module> <module>nifi-syslog-utils</module> + <module>nifi-database-utils</module> + <module>nifi-database-test-utils</module> </modules> </project> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 7c1a13c..fb10f6a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -344,6 +344,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-database-utils</artifactId> + <version>1.10.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-web-test-utils</artifactId> <version>1.10.0-SNAPSHOT</version> <scope>test</scope> @@ -354,6 +359,12 @@ <version>2.0.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-database-test-utils</artifactId> + <version>1.10.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index e013a5c..212febc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -33,8 +33,8 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.db.JdbcCommon; import java.nio.charset.Charset; import java.sql.Connection; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java index 6b166d9..1df0ae2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -37,8 +37,8 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.db.DatabaseAdapter; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.db.JdbcCommon; import java.io.IOException; import java.sql.Connection; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index cfdef29..f058b77 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -39,13 +39,13 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; -import org.apache.nifi.processors.standard.util.AvroUtil.CodecType; +import org.apache.nifi.util.db.JdbcCommon; -import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; -import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; -import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; -import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; +import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION; +import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE; +import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO; +import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES; +import static org.apache.nifi.util.db.AvroUtil.CodecType; @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 80d33c0..897a929 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -32,8 +32,8 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.sql.RecordSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.db.JdbcCommon; import java.util.ArrayList; import java.util.Collections; @@ -41,7 +41,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; +import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES; + @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index b9686b2..96a8d3e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -73,7 +73,7 @@ import java.util.stream.Collectors; @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") }) -@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"}) +@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "database", "db", "logs", "convert", "filter"}) @CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, " + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then " + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), " @@ -87,7 +87,8 @@ import java.util.stream.Collectors; + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.") @DynamicProperty(name = "Value To Lookup", value = "Valid Record Path", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "A RecordPath that points to the field whose value will be looked up in the configured Lookup Service") -@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"}) +@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, + classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService", "org.apache.nifi.lookup.db.DatabaseRecordLookupService"}) public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> { private volatile RecordPathCache recordPathCache = new RecordPathCache(25); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java index 8834821..6a4e3a6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java @@ -51,8 +51,8 @@ import org.apache.nifi.processor.util.pattern.PartialFunctions.FlowFileGroup; import org.apache.nifi.processor.util.pattern.PutGroup; import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.processor.util.pattern.RoutingResult; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.db.JdbcCommon; import java.io.IOException; import java.io.InputStream; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java index 1089370..b8cc75c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java @@ -35,7 +35,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.db.JdbcCommon; import java.util.ArrayList; import java.util.Collections; @@ -43,10 +43,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; -import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; -import static org.apache.nifi.processors.standard.util.JdbcCommon.NORMALIZE_NAMES_FOR_AVRO; -import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; +import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_PRECISION; +import static org.apache.nifi.processors.standard.util.JdbcProperties.DEFAULT_SCALE; +import static org.apache.nifi.processors.standard.util.JdbcProperties.NORMALIZE_NAMES_FOR_AVRO; +import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES; @TriggerSerially diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java index 4464842..371d225 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java @@ -35,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.sql.RecordSqlWriter; import org.apache.nifi.processors.standard.sql.SqlWriter; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.db.JdbcCommon; import java.util.ArrayList; import java.util.Collections; @@ -44,7 +44,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES; +import static org.apache.nifi.processors.standard.util.JdbcProperties.USE_AVRO_LOGICAL_TYPES; @TriggerSerially diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java index 574aca7..d5b51c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java @@ -19,8 +19,7 @@ package org.apache.nifi.processors.standard.sql; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; -import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.db.JdbcCommon; import java.io.IOException; import java.io.OutputStream; @@ -29,20 +28,23 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions; +import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback; + public class DefaultAvroSqlWriter implements SqlWriter { - private final JdbcCommon.AvroConversionOptions options; + private final AvroConversionOptions options; private final Map<String,String> attributesToAdd = new HashMap<String,String>() {{ put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); }}; - public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) { + public DefaultAvroSqlWriter(AvroConversionOptions options) { this.options = options; } @Override - public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception { + public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception { try { return JdbcCommon.convertToAvroStream(resultSet, outputStream, options, callback); } catch (SQLException e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java index c1a76b4..d5d798b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java @@ -22,8 +22,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; -import org.apache.nifi.processors.standard.util.JdbcCommon; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; @@ -32,6 +30,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.db.JdbcCommon; import java.io.IOException; import java.io.OutputStream; @@ -41,6 +40,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.nifi.util.db.JdbcCommon.AvroConversionOptions; +import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback; + public class RecordSqlWriter implements SqlWriter { private final RecordSetWriterFactory recordSetWriterFactory; @@ -52,7 +54,7 @@ public class RecordSqlWriter implements SqlWriter { private RecordSchema writeSchema; private String mimeType; - public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) { + public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, String> originalAttributes) { this.recordSetWriterFactory = recordSetWriterFactory; this.writeResultRef = new AtomicReference<>(); this.maxRowsPerFlowFile = maxRowsPerFlowFile; @@ -61,7 +63,7 @@ public class RecordSqlWriter implements SqlWriter { } @Override - public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception { + public long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception { final RecordSet recordSet; try { if (fullRecordSet == null) { @@ -129,9 +131,9 @@ public class RecordSqlWriter implements SqlWriter { private static class ResultSetRecordSetWithCallback extends ResultSetRecordSet { - private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback; + private final ResultSetRowCallback callback; - ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws SQLException { + ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema readerSchema, ResultSetRowCallback callback) throws SQLException { super(rs, readerSchema); this.callback = callback; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java index 08fc3fd..abbe842 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java @@ -18,7 +18,6 @@ package org.apache.nifi.processors.standard.sql; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable; import java.io.IOException; import java.io.OutputStream; @@ -26,6 +25,9 @@ import java.sql.ResultSet; import java.util.Collections; import java.util.Map; +import static org.apache.nifi.util.db.JdbcCommon.ResultSetRowCallback; + + /** * The SqlWriter interface provides a standard way for processors such as ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord * to write SQL result sets out to a flow file in whichever manner is appropriate. For example, ExecuteSQL writes the result set as Avro but ExecuteSQLRecord @@ -42,7 +44,7 @@ public interface SqlWriter { * @return the number of rows written to the output stream * @throws Exception if any errors occur during the writing of the result set to the output stream */ - long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector callback) throws Exception; + long writeResultSet(ResultSet resultSet, OutputStream outputStream, ComponentLog logger, ResultSetRowCallback callback) throws Exception; /** * Returns a map of attribute key/value pairs to be added to any outgoing flow file(s). The default implementation is to return an empty map. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java new file mode 100644 index 0000000..4683cbc --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcProperties.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.db.JdbcCommon; + +public class JdbcProperties { + + public static final PropertyDescriptor NORMALIZE_NAMES_FOR_AVRO = new PropertyDescriptor.Builder() + .name("dbf-normalize") + .displayName("Normalize Table/Column Names") + .description("Whether to change non-Avro-compatible characters in column names to Avro-compatible characters. For example, colons and periods " + + "will be changed to underscores in order to build a valid Avro record.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor USE_AVRO_LOGICAL_TYPES = new PropertyDescriptor.Builder() + .name("dbf-user-logical-types") + .displayName("Use Avro Logical Types") + .description("Whether to use Avro Logical Types for DECIMAL/NUMBER, DATE, TIME and TIMESTAMP columns. " + + "If disabled, written as string. " + + "If enabled, Logical types are used and written as its underlying type, specifically, " + + "DECIMAL/NUMBER as logical 'decimal': written as bytes with additional precision and scale meta data, " + + "DATE as logical 'date-millis': written as int denoting days since Unix epoch (1970-01-01), " + + "TIME as logical 'time-millis': written as int denoting milliseconds since Unix epoch, " + + "and TIMESTAMP as logical 'timestamp-millis': written as long denoting milliseconds since Unix epoch. " + + "If a reader of written Avro records also knows these logical types, then these values can be deserialized with more context depending on reader implementation.") + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + public static final PropertyDescriptor DEFAULT_PRECISION = new PropertyDescriptor.Builder() + .name("dbf-default-precision") + .displayName("Default Decimal Precision") + .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," + + " a specific 'precision' denoting number of available digits is required." + + " Generally, precision is defined by column data type definition or database engines default." + + " However undefined precision (0) can be returned from some database engines." + + " 'Default Decimal Precision' is used when writing those undefined precision numbers.") + .defaultValue(String.valueOf(JdbcCommon.DEFAULT_PRECISION_VALUE)) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor DEFAULT_SCALE = new PropertyDescriptor.Builder() + .name("dbf-default-scale") + .displayName("Default Decimal Scale") + .description("When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type," + + " a specific 'scale' denoting number of available decimal digits is required." + + " Generally, scale is defined by column data type definition or database engines default." + + " However when undefined precision (0) is returned, scale can also be uncertain with some database engines." + + " 'Default Decimal Scale' is used when writing those undefined numbers." + + " If a value has more decimals than specified scale, then the value will be rounded-up," + + " e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.") + .defaultValue(String.valueOf(JdbcCommon.DEFAULT_SCALE_VALUE)) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 5458434..9961c6f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -46,12 +46,12 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.standard.util.AvroUtil; -import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.db.AvroUtil; +import org.apache.nifi.util.db.SimpleCommerceDataSet; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -540,7 +540,7 @@ public class TestExecuteSQL { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); - TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100); + SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100); LOGGER.info("test data loaded"); // ResultSet size will be 1x200x100 = 20 000 rows diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 6f6a091..375cb98 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -28,7 +28,6 @@ import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.standard.util.TestJdbcHugeStream; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; @@ -36,6 +35,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.db.SimpleCommerceDataSet; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -492,7 +492,7 @@ public class TestExecuteSQLRecord { // load test data to database final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); - TestJdbcHugeStream.loadTestData2Database(con, 100, 200, 100); + SimpleCommerceDataSet.loadTestData2Database(con, 100, 200, 100); LOGGER.info("test data loaded"); // ResultSet size will be 1x200x100 = 20 000 rows diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml index 2429ca4..ee3ba71 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml @@ -83,9 +83,8 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-mock</artifactId> + <artifactId>nifi-database-utils</artifactId> <version>1.10.0-SNAPSHOT</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.nifi</groupId> @@ -105,6 +104,24 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-dbcp-service-api</artifactId> + <version>1.10.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>com.burgstaller</groupId> + <artifactId>okhttp-digest</artifactId> + <version>1.18</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.10.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock-record-utils</artifactId> <version>1.10.0-SNAPSHOT</version> <scope>test</scope> @@ -141,10 +158,10 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.burgstaller</groupId> - <artifactId>okhttp-digest</artifactId> - <version>1.18</version> - <scope>compile</scope> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.11.1.1</version> + <scope>test</scope> </dependency> </dependencies> <build> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java new file mode 100644 index 0000000..e91e6d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/AbstractDatabaseLookupService.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.db; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class AbstractDatabaseLookupService extends AbstractControllerService { + + static final String KEY = "key"; + + static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet())); + + static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-dbcp-service") + .displayName("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-table-name") + .displayName("Table Name") + .description("The name of the database table to be queried. Note that this may be case-sensitive depending on the database.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor LOOKUP_KEY_COLUMN = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-key-column") + .displayName("Lookup Key Column") + .description("The column in the table that will serve as the lookup key. This is the column that will be matched against " + + "the property specified in the lookup processor. Note that this may be case-sensitive depending on the database.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-cache-size") + .displayName("Cache Size") + .description("Specifies how many lookup values/records should be cached. The cache is shared for all tables and keeps a map of lookup values to records. " + + "Setting this property to zero means no caching will be done and the table will be queried for each lookup value in each record. If the lookup " + + "table changes often or the most recent data must be retrieved, do not use the cache.") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("0") + .required(true) + .build(); + + static final PropertyDescriptor CLEAR_CACHE_ON_ENABLED = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-clear-cache-on-enabled") + .displayName("Clear Cache on Enabled") + .description("Whether to clear the cache when this service is enabled. If the Cache Size is zero then this property is ignored. Clearing the cache when the " + + "service is enabled ensures that the service will first go to the database to get the most recent data.") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .required(true) + .build(); + + static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder() + .name("Cache Expiration") + .description("Time interval to clear all cache entries. If the Cache Size is zero then this property is ignored.") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + protected List<PropertyDescriptor> properties; + + DBCPService dbcpService; + + volatile String lookupKeyColumn; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java new file mode 100644 index 0000000..fdb1452 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/DatabaseRecordLookupService.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.db; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import org.apache.avro.Schema; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.RecordLookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.Tuple; +import org.apache.nifi.util.db.JdbcCommon; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value", "record"}) +@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " + + "the specified columns (or all if Lookup Value Columns are not specified) are returned as a Record. Only one row " + + "will be returned for each lookup, duplicate database entries are ignored.") +public class DatabaseRecordLookupService extends AbstractDatabaseLookupService implements RecordLookupService { + + private volatile Cache<Tuple<String, Object>, Record> cache; + private volatile JdbcCommon.AvroConversionOptions options; + + static final PropertyDescriptor LOOKUP_VALUE_COLUMNS = new PropertyDescriptor.Builder() + .name("dbrecord-lookup-value-columns") + .displayName("Lookup Value Columns") + .description("A comma-delimited list of columns in the table that will be returned when the lookup key matches. Note that this may be case-sensitive depending on the database.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + @Override + protected void init(final ControllerServiceInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DBCP_SERVICE); + properties.add(TABLE_NAME); + properties.add(LOOKUP_KEY_COLUMN); + properties.add(LOOKUP_VALUE_COLUMNS); + properties.add(CACHE_SIZE); + properties.add(CLEAR_CACHE_ON_ENABLED); + properties.add(CACHE_EXPIRATION); + this.properties = Collections.unmodifiableList(properties); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue(); + final int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); + final boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean(); + final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L; + if (this.cache == null || (cacheSize > 0 && clearCache)) { + if (durationNanos > 0) { + this.cache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfter(new Expiry<Tuple<String, Object>, Record>() { + @Override + public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime) { + return durationNanos; + } + + @Override + public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) { + return currentDuration; + } + + @Override + public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Record record, long currentTime, long currentDuration) { + return currentDuration; + } + }) + .build(); + } else { + this.cache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(); + } + } + + options = JdbcCommon.AvroConversionOptions.builder() + .recordName("NiFi_DB_Record_Lookup") + // Ignore duplicates + .maxRows(1) + // Keep column names as field names + .convertNames(false) + .useLogicalTypes(true) + .build(); + } + + @Override + public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException { + return lookup(coordinates, null); + } + + @Override + public Optional<Record> lookup(final Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException { + if (coordinates == null) { + return Optional.empty(); + } + + final Object key = coordinates.get(KEY); + if (StringUtils.isBlank(key.toString())) { + return Optional.empty(); + } + + final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue(); + final String lookupValueColumnsList = getProperty(LOOKUP_VALUE_COLUMNS).evaluateAttributeExpressions(context).getValue(); + + Set<String> lookupValueColumnsSet = new LinkedHashSet<>(); + if (lookupValueColumnsList != null) { + Stream.of(lookupValueColumnsList) + .flatMap(path -> Arrays.stream(path.split(","))) + .filter(DatabaseRecordLookupService::isNotBlank) + .map(String::trim) + .forEach(lookupValueColumnsSet::add); + } + + final String lookupValueColumns = lookupValueColumnsSet.isEmpty() ? "*" : String.join(",", lookupValueColumnsSet); + + Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key); + + // Not using the function param of cache.get so we can catch and handle the checked exceptions + Record foundRecord = cache.get(cacheLookupKey, k -> null); + + if (foundRecord == null) { + final String selectQuery = "SELECT " + lookupValueColumns + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?"; + try (final Connection con = dbcpService.getConnection(context); + final PreparedStatement st = con.prepareStatement(selectQuery)) { + + st.setObject(1, key); + ResultSet resultSet = st.executeQuery(); + final Schema avroSchema = JdbcCommon.createSchema(resultSet, options); + final RecordSchema recordAvroSchema = AvroTypeUtil.createSchema(avroSchema); + ResultSetRecordSet resultSetRecordSet = new ResultSetRecordSet(resultSet, recordAvroSchema, true); + foundRecord = resultSetRecordSet.next(); + + // Populate the cache if the record is present + if (foundRecord != null) { + cache.put(cacheLookupKey, foundRecord); + } + + } catch (SQLException se) { + throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString() + + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se); + } catch (IOException ioe) { + throw new LookupFailureException("Error retrieving result set for SQL statement: " + selectQuery + "for value " + key.toString() + + " : " + (ioe.getCause() == null ? ioe.getMessage() : ioe.getCause().getMessage()), ioe); + } + } + + return Optional.ofNullable(foundRecord); + } + + private static boolean isNotBlank(final String value) { + return value != null && !value.trim().isEmpty(); + } + + @Override + public Set<String> getRequiredKeys() { + return REQUIRED_KEYS; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java new file mode 100644 index 0000000..f649582 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/db/SimpleDatabaseLookupService.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.db; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.lookup.LookupFailureException; +import org.apache.nifi.lookup.StringLookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.Tuple; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@Tags({"lookup", "cache", "enrich", "join", "rdbms", "database", "reloadable", "key", "value"}) +@CapabilityDescription("A relational-database-based lookup service. When the lookup key is found in the database, " + + "the specified lookup value column is returned. Only one value will be returned for each lookup, duplicate database entries are ignored.") +public class SimpleDatabaseLookupService extends AbstractDatabaseLookupService implements StringLookupService { + + private volatile Cache<Tuple<String, Object>, String> cache; + + static final PropertyDescriptor LOOKUP_VALUE_COLUMN = + new PropertyDescriptor.Builder() + .name("lookup-value-column") + .displayName("Lookup Value Column") + .description("The column whose value will be returned when the Lookup value is matched") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + @Override + protected void init(final ControllerServiceInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DBCP_SERVICE); + properties.add(TABLE_NAME); + properties.add(LOOKUP_KEY_COLUMN); + properties.add(LOOKUP_VALUE_COLUMN); + properties.add(CACHE_SIZE); + properties.add(CLEAR_CACHE_ON_ENABLED); + properties.add(CACHE_EXPIRATION); + this.properties = Collections.unmodifiableList(properties); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).evaluateAttributeExpressions().getValue(); + int cacheSize = context.getProperty(CACHE_SIZE).evaluateAttributeExpressions().asInteger(); + boolean clearCache = context.getProperty(CLEAR_CACHE_ON_ENABLED).asBoolean(); + final long durationNanos = context.getProperty(CACHE_EXPIRATION).isSet() ? context.getProperty(CACHE_EXPIRATION).evaluateAttributeExpressions().asTimePeriod(TimeUnit.NANOSECONDS) : 0L; + if (this.cache == null || (cacheSize > 0 && clearCache)) { + if (durationNanos > 0) { + this.cache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfter(new Expiry<Tuple<String, Object>, Object>() { + @Override + public long expireAfterCreate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime) { + return durationNanos; + } + + @Override + public long expireAfterUpdate(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) { + return currentDuration; + } + + @Override + public long expireAfterRead(Tuple<String, Object> stringObjectTuple, Object value, long currentTime, long currentDuration) { + return currentDuration; + } + }) + .build(); + } else { + this.cache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(); + } + } + } + + @Override + public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException { + return lookup(coordinates, null); + } + + @Override + public Optional<String> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException { + if (coordinates == null) { + return Optional.empty(); + } + + final Object key = coordinates.get(KEY); + if (StringUtils.isBlank(key.toString())) { + return Optional.empty(); + } + + final String tableName = getProperty(TABLE_NAME).evaluateAttributeExpressions(context).getValue(); + final String lookupValueColumn = getProperty(LOOKUP_VALUE_COLUMN).evaluateAttributeExpressions(context).getValue(); + + Tuple<String, Object> cacheLookupKey = new Tuple<>(tableName, key); + + // Not using the function param of cache.get so we can catch and handle the checked exceptions + String foundRecord = cache.get(cacheLookupKey, k -> null); + + if (foundRecord == null) { + final String selectQuery = "SELECT " + lookupValueColumn + " FROM " + tableName + " WHERE " + lookupKeyColumn + " = ?"; + try (final Connection con = dbcpService.getConnection(context); + final PreparedStatement st = con.prepareStatement(selectQuery)) { + + st.setObject(1, key); + ResultSet resultSet = st.executeQuery(); + + if (!resultSet.next()) { + return Optional.empty(); + } + + Object o = resultSet.getObject(lookupValueColumn); + if (o == null) { + return Optional.empty(); + } + foundRecord = o.toString(); + + // Populate the cache if the record is present + if (foundRecord != null) { + cache.put(cacheLookupKey, foundRecord); + } + + } catch (SQLException se) { + throw new LookupFailureException("Error executing SQL statement: " + selectQuery + "for value " + key.toString() + + " : " + (se.getCause() == null ? se.getMessage() : se.getCause().getMessage()), se); + } + } + + return Optional.ofNullable(foundRecord); + } + + @Override + public Set<String> getRequiredKeys() { + return REQUIRED_KEYS; + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 631fdaa..06d7622 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -14,9 +14,11 @@ # limitations under the License. org.apache.nifi.lookup.maxmind.IPLookupService org.apache.nifi.lookup.CSVRecordLookupService +org.apache.nifi.lookup.db.DatabaseRecordLookupService org.apache.nifi.lookup.PropertiesFileLookupService org.apache.nifi.lookup.RestLookupService org.apache.nifi.lookup.SimpleKeyValueLookupService org.apache.nifi.lookup.SimpleCsvFileLookupService +org.apache.nifi.lookup.db.SimpleDatabaseLookupService org.apache.nifi.lookup.XMLFileLookupService org.apache.nifi.lookup.DistributedMapCacheLookupService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy new file mode 100644 index 0000000..860a90d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestDatabaseRecordLookupService.groovy @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.db + +import org.apache.nifi.controller.AbstractControllerService +import org.apache.nifi.dbcp.DBCPService +import org.apache.nifi.lookup.LookupFailureException +import org.apache.nifi.lookup.LookupService +import org.apache.nifi.lookup.TestProcessor +import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.reporting.InitializationException +import org.apache.nifi.serialization.record.Record +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.SQLException +import java.sql.Statement + +import static org.hamcrest.CoreMatchers.instanceOf +import static org.junit.Assert.assertEquals +import static org.junit.Assert.assertNull +import static org.junit.Assert.assertThat + + +class TestDatabaseRecordLookupService { + + private TestRunner runner + + private final static Optional<Record> EMPTY_RECORD = Optional.empty() + private final static String DB_LOCATION = "target/db" + + @BeforeClass + static void setupClass() { + System.setProperty("derby.stream.error.file", "target/derby.log") + } + + @Before + void setup() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl() + final Map<String, String> dbcpProperties = new HashMap<>() + + runner = TestRunners.newTestRunner(TestProcessor.class) + runner.addControllerService("dbcp", dbcp, dbcpProperties) + runner.enableControllerService(dbcp) + } + + @Test + void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION) + dbLocation.delete() + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection + final Statement stmt = con.createStatement() + + try { + stmt.execute("drop table TEST") + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))") + stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')") + stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')") + + final DatabaseRecordLookupService service = new DatabaseRecordLookupService() + + runner.addControllerService("db-lookup-service", service) + runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp") + runner.assertNotValid() + runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST") + runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id") + runner.enableControllerService(service) + runner.assertValid(service) + + def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service") + + assertThat(lookupService, instanceOf(LookupService.class)) + + final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertNull("Should be null but is not", property1.get().getAsInt("VAL1")) + assertEquals("Hello", property1.get().getAsString("VAL2")) + + final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals(1, property2.get().getAsInt("VAL1")) + assertEquals("World", property2.get().getAsString("VAL2")) + + // Key not found + final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2")) + assertEquals(EMPTY_RECORD, property3) + } + + @Test + void testDatabaseLookupServiceSpecifyColumns() throws InitializationException, IOException, LookupFailureException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION) + dbLocation.delete() + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection + final Statement stmt = con.createStatement() + + try { + stmt.execute("drop table TEST") + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))") + stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')") + stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')") + + final DatabaseRecordLookupService service = new DatabaseRecordLookupService() + + runner.addControllerService("db-lookup-service", service) + runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp") + runner.assertNotValid() + runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST") + runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id") + runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, "val1") + runner.enableControllerService(service) + runner.assertValid(service) + + def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service") + + assertThat(lookupService, instanceOf(LookupService.class)) + + final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertNull("Should be null but is not", property1.get().getAsInt("VAL1")) + + final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals(1, property2.get().getAsInt("VAL1")) + + // Key not found + final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2")) + assertEquals(EMPTY_RECORD, property3) + } + + @Test + void exerciseCacheLogic() { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION) + dbLocation.delete() + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection + final Statement stmt = con.createStatement() + + try { + stmt.execute("drop table TEST") + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))") + stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')") + stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')") + + final DatabaseRecordLookupService service = new DatabaseRecordLookupService() + + runner.addControllerService("db-lookup-service", service) + runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp") + runner.assertNotValid() + runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST") + runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id") + runner.setProperty(service, DatabaseRecordLookupService.CACHE_SIZE, "10") + runner.enableControllerService(service) + runner.assertValid(service) + + def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service") + + assertThat(lookupService, instanceOf(LookupService.class)) + + final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals(1, property1.get().getAsInt("VAL1")) + assertEquals("World", property1.get().getAsString("VAL2")) + + final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals(1, property2.get().getAsInt("VAL1")) + assertEquals("World", property2.get().getAsString("VAL2")) + + final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertNull(property3.get().getAsInt("VAL1")) + assertEquals("Hello", property3.get().getAsString("VAL2")) + + final Optional<Record> property4 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertNull(property4.get().getAsInt("VAL1")) + assertEquals("Hello", property4.get().getAsString("VAL2")) + } + + /** + * Simple implementation for component testing. + * + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + String getIdentifier() { + "dbcp" + } + + @Override + Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver") + DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true") + } catch (e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy new file mode 100644 index 0000000..e255771 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/groovy/org/apache/nifi/lookup/db/TestSimpleDatabaseLookupService.groovy @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup.db + +import org.apache.nifi.controller.AbstractControllerService +import org.apache.nifi.dbcp.DBCPService +import org.apache.nifi.lookup.LookupFailureException +import org.apache.nifi.lookup.LookupService +import org.apache.nifi.lookup.TestProcessor +import org.apache.nifi.processor.exception.ProcessException +import org.apache.nifi.reporting.InitializationException +import org.apache.nifi.serialization.record.Record +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.SQLException +import java.sql.Statement + +import static org.hamcrest.CoreMatchers.instanceOf +import static org.junit.Assert.* + +class TestSimpleDatabaseLookupService { + + private TestRunner runner + + private final static Optional<Record> EMPTY_RECORD = Optional.empty() + private final static String DB_LOCATION = "target/db" + + @BeforeClass + static void setupClass() { + System.setProperty("derby.stream.error.file", "target/derby.log") + } + + @Before + void setup() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl() + final Map<String, String> dbcpProperties = new HashMap<>() + + runner = TestRunners.newTestRunner(TestProcessor.class) + runner.addControllerService("dbcp", dbcp, dbcpProperties) + runner.enableControllerService(dbcp) + } + + @Test + void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION) + dbLocation.delete() + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection + final Statement stmt = con.createStatement() + + try { + stmt.execute("drop table TEST") + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))") + stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')") + stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')") + + final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService() + + runner.addControllerService("db-lookup-service", service) + runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp") + runner.assertNotValid() + runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST") + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id") + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1") + runner.enableControllerService(service) + runner.assertValid(service) + + def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service") + + assertThat(lookupService, instanceOf(LookupService.class)) + + // Lookup VAL1 + final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertFalse(property1.isPresent()) + // Key not found + final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "2")) + assertEquals(EMPTY_RECORD, property3) + + runner.disableControllerService(service) + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2") + runner.enableControllerService(service) + final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals("World", property2.get()) + } + + @Test + void exerciseCacheLogic() { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION) + dbLocation.delete() + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection + final Statement stmt = con.createStatement() + + try { + stmt.execute("drop table TEST") + } catch (final SQLException sqle) { + } + + stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))") + stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')") + stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')") + + final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService() + + runner.addControllerService("db-lookup-service", service) + runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp") + runner.assertNotValid() + runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST") + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id") + runner.setProperty(service, SimpleDatabaseLookupService.CACHE_SIZE, "10") + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1") + runner.enableControllerService(service) + runner.assertValid(service) + + def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service") + + assertThat(lookupService, instanceOf(LookupService.class)) + + // Lookup VAL1 + final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals("1", property1.get()) + final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertFalse(property3.isPresent()) + + + runner.disableControllerService(service) + runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2") + runner.enableControllerService(service) + final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1")) + assertEquals("World", property2.get()) + + final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "0")) + assertEquals("Hello", property4.get()) + } + + /** + * Simple implementation for component testing. + * + */ + class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService { + + @Override + String getIdentifier() { + "dbcp" + } + + @Override + Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver") + DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true") + } catch (e) { + throw new ProcessException("getConnection failed: " + e); + } + } + } +} \ No newline at end of file
