This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 205b4b7fd4 NIFI-11526 Refactored DatabaseRecordSinkTest to Java from
Groovy
205b4b7fd4 is described below
commit 205b4b7fd42dd0254bcb3d37f033c124358e60e3
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Thu May 4 16:47:25 2023 -0700
NIFI-11526 Refactored DatabaseRecordSinkTest to Java from Groovy
This closes #7229
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 340b5fcb00671e3a23834efc025220998f044764)
---
.../record/sink/db/DatabaseRecordSinkTest.groovy | 325 ------------------
.../record/sink/db/DatabaseRecordSinkTest.java | 363 +++++++++++++++++++++
2 files changed, 363 insertions(+), 325 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
deleted file mode 100644
index 5b1c72eed8..0000000000
---
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/groovy/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.groovy
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.record.sink.db
-
-import org.apache.nifi.attribute.expression.language.StandardPropertyValue
-import org.apache.nifi.components.PropertyValue
-import org.apache.nifi.components.state.StateManager
-import org.apache.nifi.controller.ConfigurationContext
-import org.apache.nifi.controller.ControllerServiceInitializationContext
-import org.apache.nifi.dbcp.DBCPConnectionPool
-import org.apache.nifi.dbcp.DBCPService
-import org.apache.nifi.logging.ComponentLog
-import org.apache.nifi.record.sink.RecordSinkService
-import org.apache.nifi.reporting.InitializationException
-import org.apache.nifi.serialization.RecordSetWriterFactory
-import org.apache.nifi.serialization.SimpleRecordSchema
-import org.apache.nifi.serialization.WriteResult
-import org.apache.nifi.serialization.record.ListRecordSet
-import org.apache.nifi.serialization.record.MapRecord
-import org.apache.nifi.serialization.record.MockRecordWriter
-import org.apache.nifi.serialization.record.RecordField
-import org.apache.nifi.serialization.record.RecordFieldType
-import org.apache.nifi.serialization.record.RecordSchema
-import org.apache.nifi.serialization.record.RecordSet
-import org.apache.nifi.state.MockStateManager
-import org.apache.nifi.util.MockControllerServiceInitializationContext
-import org.apache.nifi.util.MockPropertyValue
-import org.apache.nifi.util.file.FileUtils
-import org.junit.jupiter.api.AfterAll
-import org.junit.jupiter.api.BeforeAll
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.function.Executable
-
-import java.sql.DriverManager
-import java.sql.ResultSet
-import java.sql.SQLException
-import java.sql.SQLNonTransientConnectionException
-import java.sql.Statement
-
-import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL
-import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME
-import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION
-import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD
-import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER
-import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD
-import static
org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
-import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
-import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
-import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME
-import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE
-import static
org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME
-import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY
-import static org.junit.jupiter.api.Assertions.assertEquals
-import static org.junit.jupiter.api.Assertions.assertFalse
-import static org.junit.jupiter.api.Assertions.assertNotNull
-import static org.junit.jupiter.api.Assertions.assertThrows
-import static org.junit.jupiter.api.Assertions.assertTrue
-
-import static org.mockito.Mockito.mock
-import static org.mockito.Mockito.when
-
-class DatabaseRecordSinkTest {
-
- final static String DB_LOCATION = "target/db"
-
- DBCPService dbcpService
-
- @BeforeAll
- static void setup() {
- System.setProperty("derby.stream.error.file", "target/derby.log")
- }
-
- @AfterAll
- static void cleanUpAfterClass() throws Exception {
- try {
- DriverManager.getConnection("jdbc:derby:" + DB_LOCATION +
";shutdown=true")
- } catch (SQLNonTransientConnectionException ignore) {
- // Do nothing, this is what happens at Derby shutdown
- }
- // remove previous test database, if any
- final File dbLocation = new File(DB_LOCATION)
- try {
- FileUtils.deleteFile(dbLocation, true)
- } catch (IOException ignore) {
- // Do nothing, may not have existed
- }
- }
-
- private ConfigurationContext context
-
- @Test
- void testRecordFormat() throws IOException, InitializationException {
- DatabaseRecordSink task = initTask('TESTTABLE')
-
- // Create the table
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
- def con =
DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
- final Statement stmt = con.createStatement()
- try {
- stmt.execute("drop table TESTTABLE")
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
- try {
- stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2
varchar(20))')
- } finally {
- stmt.close()
- }
-
- List<RecordField> recordFields = Arrays.asList(
- new RecordField("field1", RecordFieldType.INT.getDataType()),
- new RecordField("field2", RecordFieldType.STRING.getDataType())
- )
- RecordSchema recordSchema = new SimpleRecordSchema(recordFields)
-
- Map<String, Object> row1 = new HashMap<>()
- row1.put('field1', 15)
- row1.put('field2', 'Hello')
-
- Map<String, Object> row2 = new HashMap<>()
- row2.put('field1', 6)
- row2.put('field2', 'World!')
-
- RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
- new MapRecord(recordSchema, row1),
- new MapRecord(recordSchema, row2)
- ))
-
- WriteResult writeResult = task.sendData(recordSet, ['a': 'Hello'],
true)
- assertNotNull(writeResult)
- assertEquals(2, writeResult.recordCount)
- assertEquals('Hello', writeResult.attributes['a'])
-
- final Statement st = con.createStatement()
- final ResultSet resultSet = st.executeQuery('select * from testTable')
- assertTrue(resultSet.next())
-
- def f1 = resultSet.getObject(1)
- assertNotNull(f1)
- assertTrue(f1 instanceof Integer)
- assertEquals(15, f1)
- def f2 = resultSet.getObject(2)
- assertNotNull(f2)
- assertTrue(f2 instanceof String)
- assertEquals('Hello', f2)
-
- assertTrue(resultSet.next())
-
- f1 = resultSet.getObject(1)
- assertNotNull(f1)
- assertTrue(f1 instanceof Integer)
- assertEquals(6, f1)
- f2 = resultSet.getObject(2)
- assertNotNull(f2)
- assertTrue(f2 instanceof String)
- assertEquals('World!', f2)
-
- assertFalse(resultSet.next())
- }
-
- @Test
- void testMissingTable() throws IOException, InitializationException {
- DatabaseRecordSink task = initTask('NO_SUCH_TABLE')
-
- List<RecordField> recordFields = Arrays.asList(
- new RecordField("field1", RecordFieldType.INT.getDataType()),
- new RecordField("field2", RecordFieldType.STRING.getDataType())
- )
- RecordSchema recordSchema = new SimpleRecordSchema(recordFields)
-
- Map<String, Object> row1 = new HashMap<>()
- row1.put('field1', 15)
- row1.put('field2', 'Hello')
-
- RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)))
- assertThrows(IOException.class, { task.sendData(recordSet, new
HashMap<>(), true) } as Executable,
- 'Should have generated an exception for table not present')
- }
-
- @Test
- void testMissingField() throws IOException, InitializationException {
- DatabaseRecordSink task = initTask('TESTTABLE')
-
- // Create the table
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
- def con =
DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
- final Statement stmt = con.createStatement()
- try {
- stmt.execute("drop table TESTTABLE")
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
- try {
- stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2
varchar(20) not null)')
- } finally {
- stmt.close()
- }
-
- List<RecordField> recordFields = Arrays.asList(
- new RecordField("field1", RecordFieldType.INT.getDataType())
- )
- RecordSchema recordSchema = new SimpleRecordSchema(recordFields)
-
- Map<String, Object> row1 = new HashMap<>()
- row1.put('field1', 15)
- row1.put('field2', 'Hello')
- row1.put('field3', 'fail')
-
- RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)))
- assertThrows(IOException.class, { task.sendData(recordSet, new
HashMap<>(), true) } as Executable,
- 'Should have generated an exception for column not present')
- }
-
- @Test
- void testMissingColumn() throws IOException, InitializationException {
- DatabaseRecordSink task = initTask('TESTTABLE')
-
- // Create the table
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
- def con =
DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
- final Statement stmt = con.createStatement()
- try {
- stmt.execute("drop table TESTTABLE")
- } catch (final SQLException sqle) {
- // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
- }
- try {
- stmt.executeUpdate('CREATE TABLE testTable (field1 integer, field2
varchar(20))')
- } finally {
- stmt.close()
- }
-
- List<RecordField> recordFields = Arrays.asList(
- new RecordField("field1", RecordFieldType.INT.getDataType()),
- new RecordField("field2",
RecordFieldType.STRING.getDataType()),
- new RecordField("field3", RecordFieldType.STRING.getDataType())
- )
- RecordSchema recordSchema = new SimpleRecordSchema(recordFields)
-
- Map<String, Object> row1 = new HashMap<>()
- row1.put('field1', 15)
-
- RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)))
- assertThrows(IOException.class, { task.sendData(recordSet, new
HashMap<>(), true) } as Executable,
- 'Should have generated an exception for field not present')
- }
-
- DatabaseRecordSink initTask(String tableName) throws
InitializationException, IOException {
-
- final ComponentLog logger = mock(ComponentLog.class)
- final DatabaseRecordSink task = new DatabaseRecordSink()
- context = mock(ConfigurationContext.class)
- final StateManager stateManager = new MockStateManager(task)
-
- final PropertyValue pValue = mock(StandardPropertyValue.class)
- final MockRecordWriter writer = new MockRecordWriter(null, false) //
No header, don't quote values
-
when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue)
-
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer)
-
when(context.getProperty(DatabaseRecordSink.CATALOG_NAME)).thenReturn(new
MockPropertyValue(null))
-
when(context.getProperty(DatabaseRecordSink.SCHEMA_NAME)).thenReturn(new
MockPropertyValue(null))
-
when(context.getProperty(DatabaseRecordSink.TABLE_NAME)).thenReturn(new
MockPropertyValue(tableName ?: 'TESTTABLE'))
-
when(context.getProperty(DatabaseRecordSink.QUOTED_IDENTIFIERS)).thenReturn(new
MockPropertyValue('false'))
-
when(context.getProperty(DatabaseRecordSink.QUOTED_TABLE_IDENTIFIER)).thenReturn(new
MockPropertyValue('true'))
-
when(context.getProperty(DatabaseRecordSink.QUERY_TIMEOUT)).thenReturn(new
MockPropertyValue('5 sec'))
-
when(context.getProperty(DatabaseRecordSink.TRANSLATE_FIELD_NAMES)).thenReturn(new
MockPropertyValue('true'))
-
when(context.getProperty(DatabaseRecordSink.UNMATCHED_FIELD_BEHAVIOR)).thenReturn(new
MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_FIELD.value))
-
when(context.getProperty(DatabaseRecordSink.UNMATCHED_COLUMN_BEHAVIOR)).thenReturn(new
MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_COLUMN.value))
-
- // Set up the DBCPService to connect to a temp H2 database
- dbcpService = new DBCPConnectionPool()
-
when(pValue.asControllerService(DBCPService.class)).thenReturn(dbcpService)
-
when(context.getProperty(DatabaseRecordSink.DBCP_SERVICE)).thenReturn(pValue)
-
- final ConfigurationContext dbContext = mock(ConfigurationContext.class)
- final StateManager dbStateManager = new MockStateManager(dbcpService)
-
- when(dbContext.getProperty(DATABASE_URL)).thenReturn(new
MockPropertyValue("jdbc:derby:${DB_LOCATION}"))
- when(dbContext.getProperty(DB_USER)).thenReturn(new
MockPropertyValue(null))
- when(dbContext.getProperty(DB_PASSWORD)).thenReturn(new
MockPropertyValue(null))
- when(dbContext.getProperty(DB_DRIVERNAME)).thenReturn(new
MockPropertyValue('org.apache.derby.jdbc.EmbeddedDriver'))
- when(dbContext.getProperty(DB_DRIVER_LOCATION)).thenReturn(new
MockPropertyValue(''))
- when(dbContext.getProperty(MAX_TOTAL_CONNECTIONS)).thenReturn(new
MockPropertyValue('1'))
- when(dbContext.getProperty(VALIDATION_QUERY)).thenReturn(new
MockPropertyValue(''))
- when(dbContext.getProperty(MAX_WAIT_TIME)).thenReturn(new
MockPropertyValue('5 sec'))
- when(dbContext.getProperty(MIN_IDLE)).thenReturn(new
MockPropertyValue('0'))
- when(dbContext.getProperty(MAX_IDLE)).thenReturn(new
MockPropertyValue('0'))
- when(dbContext.getProperty(MAX_CONN_LIFETIME)).thenReturn(new
MockPropertyValue('5 sec'))
- when(dbContext.getProperty(EVICTION_RUN_PERIOD)).thenReturn(new
MockPropertyValue('5 sec'))
- when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new
MockPropertyValue('5 sec'))
-
when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new
MockPropertyValue('5 sec'))
-
when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new
MockPropertyValue(null))
- when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new
MockPropertyValue(null))
- when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new
MockPropertyValue(null))
- when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new
MockPropertyValue(null))
-
- final ControllerServiceInitializationContext dbInitContext = new
MockControllerServiceInitializationContext(dbcpService,
UUID.randomUUID().toString(), logger, dbStateManager)
- dbcpService.initialize(dbInitContext)
- dbcpService.onConfigured(dbContext)
-
- final ControllerServiceInitializationContext initContext = new
MockControllerServiceInitializationContext(writer,
UUID.randomUUID().toString(), logger, stateManager)
- task.initialize(initContext)
- task.onEnabled(context)
-
- return task
- }
-}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java
new file mode 100644
index 0000000000..d8cc5d0873
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/record/sink/db/DatabaseRecordSinkTest.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.record.sink.db;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPConnectionPool;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.dbcp.utils.DBCPProperties;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ListRecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static
org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE;
+import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD;
+import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DATABASE_URL;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVERNAME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_DRIVER_LOCATION;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.KERBEROS_USER_SERVICE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
+import static
org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
+import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DatabaseRecordSinkTest {
+
+ private static final String SERVICE_ID =
DBCPConnectionPool.class.getName();
+ private static final String DERBY_LOG_PROPERTY = "derby.stream.error.file";
+ private static final String DERBY_SHUTDOWN_STATE = "XJ015";
+ private static final String DB_LOCATION = "target/db";
+
+ private TestRunner runner;
+ private DBCPConnectionPool dbcpService;
+ private File databaseDirectory;
+
+ @BeforeAll
+ public static void setDerbyLog() {
+ final File derbyLog = new File(getSystemTemporaryDirectory(),
"derby.log");
+ derbyLog.deleteOnExit();
+ System.setProperty(DERBY_LOG_PROPERTY, derbyLog.getAbsolutePath());
+ }
+
+ @AfterAll
+ public static void clearDerbyLog() {
+ System.clearProperty(DERBY_LOG_PROPERTY);
+ }
+
+ @BeforeEach
+ public void setService() throws InitializationException {
+ databaseDirectory = getEmptyDirectory();
+
+ dbcpService = new DBCPConnectionPool();
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.addControllerService(SERVICE_ID, dbcpService);
+
+ final String url = String.format("jdbc:derby:%s;create=true",
databaseDirectory);
+ runner.setProperty(dbcpService, DBCPProperties.DATABASE_URL, url);
+ runner.setProperty(dbcpService, DBCPProperties.DB_USER,
String.class.getSimpleName());
+ runner.setProperty(dbcpService, DBCPProperties.DB_PASSWORD,
String.class.getName());
+ runner.setProperty(dbcpService, DBCPProperties.DB_DRIVERNAME,
"org.apache.derby.jdbc.EmbeddedDriver");
+ }
+
+ @AfterEach
+ public void shutdown() throws IOException {
+ if (databaseDirectory.exists()) {
+ final SQLException exception = assertThrows(SQLException.class, ()
-> DriverManager.getConnection("jdbc:derby:;shutdown=true"));
+ assertEquals(DERBY_SHUTDOWN_STATE, exception.getSQLState());
+ FileUtils.deleteFile(databaseDirectory, true);
+ }
+ }
+
+ private ConfigurationContext context;
+
+ @Test
+ void testRecordFormat() throws IOException, InitializationException,
SQLException {
+ final DatabaseRecordSink task = initTask("TESTTABLE");
+
+ // Create the table
+ final Connection con =
DriverManager.getConnection(String.format("jdbc:derby:%s;create=true",
DB_LOCATION));
+ final Statement stmt = con.createStatement();
+ try {
+ stmt.execute("drop table TESTTABLE");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+ try {
+ stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2
varchar(20))");
+ } finally {
+ stmt.close();
+ }
+
+ final List<RecordField> recordFields = Arrays.asList(
+ new RecordField("field1", RecordFieldType.INT.getDataType()),
+ new RecordField("field2", RecordFieldType.STRING.getDataType())
+ );
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+ final Map<String, Object> row1 = new HashMap<>();
+ row1.put("field1", 15);
+ row1.put("field2", "Hello");
+
+ final Map<String, Object> row2 = new HashMap<>();
+ row2.put("field1", 6);
+ row2.put("field2", "World!");
+
+ final RecordSet recordSet = new ListRecordSet(recordSchema,
Arrays.asList(
+ new MapRecord(recordSchema, row1),
+ new MapRecord(recordSchema, row2)
+ ));
+
+ final WriteResult writeResult = task.sendData(recordSet,
Collections.singletonMap("a", "Hello"), true);
+ assertNotNull(writeResult);
+ assertEquals(2, writeResult.getRecordCount());
+ assertEquals("Hello", writeResult.getAttributes().get("a"));
+
+ final Statement st = con.createStatement();
+ final ResultSet resultSet = st.executeQuery("select * from testTable");
+ assertTrue(resultSet.next());
+
+ Object f1 = resultSet.getObject(1);
+ assertNotNull(f1);
+ assertTrue(f1 instanceof Integer);
+ assertEquals(15, f1);
+ Object f2 = resultSet.getObject(2);
+ assertNotNull(f2);
+ assertTrue(f2 instanceof String);
+ assertEquals("Hello", f2);
+
+ assertTrue(resultSet.next());
+
+ f1 = resultSet.getObject(1);
+ assertNotNull(f1);
+ assertTrue(f1 instanceof Integer);
+ assertEquals(6, f1);
+ f2 = resultSet.getObject(2);
+ assertNotNull(f2);
+ assertTrue(f2 instanceof String);
+ assertEquals("World!", f2);
+
+ assertFalse(resultSet.next());
+ }
+
+ @Test
+ void testMissingTable() throws InitializationException {
+ final DatabaseRecordSink task = initTask("NO_SUCH_TABLE");
+
+ final List<RecordField> recordFields = Arrays.asList(
+ new RecordField("field1", RecordFieldType.INT.getDataType()),
+ new RecordField("field2", RecordFieldType.STRING.getDataType())
+ );
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+ final Map<String, Object> row1 = new HashMap<>();
+ row1.put("field1", 15);
+ row1.put("field2", "Hello");
+
+ final RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)));
+ assertThrows(IOException.class, () -> task.sendData(recordSet, new
HashMap<>(), true),
+ "Should have generated an exception for table not present");
+ }
+
+ @Test
+ void testMissingField() throws InitializationException, SQLException {
+ final DatabaseRecordSink task = initTask("TESTTABLE");
+
+ // Create the table
+ final Connection con =
DriverManager.getConnection(String.format("jdbc:derby:%s;create=true",
DB_LOCATION));
+ final Statement stmt = con.createStatement();
+ try {
+ stmt.execute("drop table TESTTABLE");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+ try {
+ stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2
varchar(20) not null)");
+ } finally {
+ stmt.close();
+ }
+
+ final List<RecordField> recordFields = Arrays.asList(
+ new RecordField("field1", RecordFieldType.INT.getDataType())
+ );
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+ final Map<String, Object> row1 = new HashMap<>();
+ row1.put("field1", 15);
+ row1.put("field2", "Hello");
+ row1.put("field3", "fail");
+
+ final RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)));
+ assertThrows(IOException.class, () -> task.sendData(recordSet, new
HashMap<>(), true),
+ "Should have generated an exception for column not present");
+
+ }
+
+ @Test
+ void testMissingColumn() throws InitializationException, SQLException {
+ final DatabaseRecordSink task = initTask("TESTTABLE");
+
+ // Create the table
+ final Connection con =
DriverManager.getConnection(String.format("jdbc:derby:%s;create=true",
DB_LOCATION));
+ final Statement stmt = con.createStatement();
+ try {
+ stmt.execute("drop table TESTTABLE");
+ } catch (final SQLException sqle) {
+ // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
+ }
+ try {
+ stmt.executeUpdate("CREATE TABLE testTable (field1 integer, field2
varchar(20))");
+ } finally {
+ stmt.close();
+ }
+
+ final List<RecordField> recordFields = Arrays.asList(
+ new RecordField("field1", RecordFieldType.INT.getDataType()),
+ new RecordField("field2",
RecordFieldType.STRING.getDataType()),
+ new RecordField("field3", RecordFieldType.STRING.getDataType())
+ );
+ final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+
+ final Map<String, Object> row1 = new HashMap<>();
+ row1.put("field1", 15);
+
+ final RecordSet recordSet = new ListRecordSet(recordSchema,
Collections.singletonList(new MapRecord(recordSchema, row1)));
+ assertThrows(IOException.class, () -> task.sendData(recordSet, new
HashMap<>(), true),
+ "Should have generated an exception for field not present");
+ }
+
+ private DatabaseRecordSink initTask(String tableName) throws
InitializationException {
+ final ComponentLog logger = mock(ComponentLog.class);
+ final DatabaseRecordSink task = new DatabaseRecordSink();
+ context = mock(ConfigurationContext.class);
+ final StateManager stateManager = new MockStateManager(task);
+
+ final PropertyValue pValue = mock(StandardPropertyValue.class);
+ final MockRecordWriter writer = new MockRecordWriter(null, false); //
No header, don"t quote values
+
when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue);
+
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
+
when(context.getProperty(DatabaseRecordSink.CATALOG_NAME)).thenReturn(new
MockPropertyValue(null));
+
when(context.getProperty(DatabaseRecordSink.SCHEMA_NAME)).thenReturn(new
MockPropertyValue(null));
+
when(context.getProperty(DatabaseRecordSink.TABLE_NAME)).thenReturn(new
MockPropertyValue(tableName != null ? tableName : "TESTTABLE"));
+
when(context.getProperty(DatabaseRecordSink.QUOTED_IDENTIFIERS)).thenReturn(new
MockPropertyValue("false"));
+
when(context.getProperty(DatabaseRecordSink.QUOTED_TABLE_IDENTIFIER)).thenReturn(new
MockPropertyValue("true"));
+
when(context.getProperty(DatabaseRecordSink.QUERY_TIMEOUT)).thenReturn(new
MockPropertyValue("5 sec"));
+
when(context.getProperty(DatabaseRecordSink.TRANSLATE_FIELD_NAMES)).thenReturn(new
MockPropertyValue("true"));
+
when(context.getProperty(DatabaseRecordSink.UNMATCHED_FIELD_BEHAVIOR)).thenReturn(new
MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_FIELD.getValue()));
+
when(context.getProperty(DatabaseRecordSink.UNMATCHED_COLUMN_BEHAVIOR)).thenReturn(new
MockPropertyValue(DatabaseRecordSink.FAIL_UNMATCHED_COLUMN.getValue()));
+
+ // Set up the DBCPService to connect to a temp H2 database
+
when(pValue.asControllerService(DBCPService.class)).thenReturn(dbcpService);
+
when(context.getProperty(DatabaseRecordSink.DBCP_SERVICE)).thenReturn(pValue);
+
+ final ConfigurationContext dbContext =
mock(ConfigurationContext.class);
+ final StateManager dbStateManager = new MockStateManager(dbcpService);
+
+ when(dbContext.getProperty(DATABASE_URL)).thenReturn(new
MockPropertyValue(String.format("jdbc:derby:%s", DB_LOCATION)));
+ when(dbContext.getProperty(DB_USER)).thenReturn(new
MockPropertyValue(null));
+ when(dbContext.getProperty(DB_PASSWORD)).thenReturn(new
MockPropertyValue(null));
+ when(dbContext.getProperty(DB_DRIVERNAME)).thenReturn(new
MockPropertyValue("org.apache.derby.jdbc.EmbeddedDriver"));
+ when(dbContext.getProperty(DB_DRIVER_LOCATION)).thenReturn(new
MockPropertyValue(""));
+ when(dbContext.getProperty(MAX_TOTAL_CONNECTIONS)).thenReturn(new
MockPropertyValue("1"));
+ when(dbContext.getProperty(VALIDATION_QUERY)).thenReturn(new
MockPropertyValue(""));
+ when(dbContext.getProperty(MAX_WAIT_TIME)).thenReturn(new
MockPropertyValue("5 sec"));
+ when(dbContext.getProperty(MIN_IDLE)).thenReturn(new
MockPropertyValue("0"));
+ when(dbContext.getProperty(MAX_IDLE)).thenReturn(new
MockPropertyValue("0"));
+ when(dbContext.getProperty(MAX_CONN_LIFETIME)).thenReturn(new
MockPropertyValue("5 sec"));
+ when(dbContext.getProperty(EVICTION_RUN_PERIOD)).thenReturn(new
MockPropertyValue("5 sec"));
+ when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new
MockPropertyValue("5 sec"));
+
when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new
MockPropertyValue("5 sec"));
+
when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new
MockPropertyValue(null));
+ when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new
MockPropertyValue(null));
+ when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new
MockPropertyValue(null));
+ when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new
MockPropertyValue(null));
+
+ final ControllerServiceInitializationContext dbInitContext = new
MockControllerServiceInitializationContext(dbcpService,
UUID.randomUUID().toString(), logger, dbStateManager);
+ dbcpService.initialize(dbInitContext);
+ dbcpService.onConfigured(dbContext);
+
+ final ControllerServiceInitializationContext initContext = new
MockControllerServiceInitializationContext(writer,
UUID.randomUUID().toString(), logger, stateManager);
+ task.initialize(initContext);
+ task.onEnabled(context);
+
+ return task;
+ }
+
+ private File getEmptyDirectory() {
+ final String randomDirectory = String.format("%s-%s",
getClass().getSimpleName(), UUID.randomUUID());
+ return Paths.get(getSystemTemporaryDirectory(),
randomDirectory).toFile();
+ }
+
+ private static String getSystemTemporaryDirectory() {
+ return System.getProperty("java.io.tmpdir");
+ }
+}