This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 38d26da  NIFI-9192: ResultSetRecordSet considers value of 
useLogicalTypes flag when determining the object's schema.
38d26da is described below

commit 38d26dafa36888826b1017f9f36725628dc40430
Author: Peter Gyori <[email protected]>
AuthorDate: Tue Sep 14 17:04:54 2021 +0200

    NIFI-9192: ResultSetRecordSet considers value of useLogicalTypes flag when 
determining the object's schema.
    
    NIFI-9192: ResultSetRecordSet consider value of useLogicalType flag when 
sqlType is not handled separately (default) and readerSchema is not null
    
    Signed-off-by: Peter Gyori <[email protected]>
    
    NIFI-9192: Unit tests added to test the use of useLogicalType flag
    
    Signed-off-by: Peter Gyori <[email protected]>
    
    NIFI-9192: ResultSetRecordSet consider not using logical type when sqlType 
is OTHER
    
    NIFI-9192: Fixed checkstyle violations
    
    Signed-off-by: Peter Gyori <[email protected]>
    
    NIFI-9192: Renamed JdbcCommon.getUseLogicalTypes to isUseLogicalTypes
    
    Signed-off-by: Peter Gyori <[email protected]>
    
    NIFI-9192: Added unit tests for Array types with and without using logical 
types
    
    Signed-off-by: Peter Gyori <[email protected]>
    
    NIFI-9192: Review comments considered and unit tests refactored
    
    NIFI-9192: Added further unit tests
    
    NIFI-9192: Minor modifications based on review comments
    
    Signed-off-by: Peter Gyori <[email protected]>
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #5388
---
 .../serialization/record/ResultSetRecordSet.java   | 173 +++---
 .../record/ResultSetRecordSetTest.java             | 585 ++++++++++++++++++---
 .../java/org/apache/nifi/util/db/JdbcCommon.java   |   4 +
 .../processors/standard/sql/RecordSqlWriter.java   |   6 +-
 4 files changed, 641 insertions(+), 127 deletions(-)

diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 7bf48f0..37d5f19 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -68,18 +68,22 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
     }
 
     public ResultSetRecordSet(final ResultSet rs, final RecordSchema 
readerSchema, final int defaultPrecision, final int defaultScale) throws 
SQLException {
+        this(rs, readerSchema, defaultPrecision, defaultScale, true);
+    }
+
+    public ResultSetRecordSet(final ResultSet rs, final RecordSchema 
readerSchema, final int defaultPrecision, final int defaultScale, final boolean 
useLogicalTypes) throws SQLException {
         this.defaultPrecision = defaultPrecision;
         this.defaultScale = defaultScale;
         this.rs = rs;
         this.rsColumnNames = new HashSet<>();
         RecordSchema tempSchema;
         try {
-            tempSchema = createSchema(rs, readerSchema);
+            tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
             moreRows = rs.next();
         } catch(SQLException se) {
             // Tried to create the schema with a ResultSet without calling 
next() first (probably for DB2), now try the other way around
             moreRows = rs.next();
-            tempSchema = createSchema(rs, readerSchema);
+            tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
         }
         this.schema = tempSchema;
     }
@@ -168,7 +172,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         return value;
     }
 
-    private RecordSchema createSchema(final ResultSet rs, final RecordSchema 
readerSchema) throws SQLException {
+    private RecordSchema createSchema(final ResultSet rs, final RecordSchema 
readerSchema, final boolean useLogicalTypes) throws SQLException {
         final ResultSetMetaData metadata = rs.getMetaData();
         final int numCols = metadata.getColumnCount();
         final List<RecordField> fields = new ArrayList<>(numCols);
@@ -177,7 +181,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
             final int column = i + 1;
             final int sqlType = metadata.getColumnType(column);
 
-            final DataType dataType = getDataType(sqlType, rs, column, 
readerSchema);
+            final DataType dataType = getDataType(sqlType, rs, column, 
readerSchema, useLogicalTypes);
             final String fieldName = metadata.getColumnLabel(column);
 
             final int nullableFlag = metadata.isNullable(column);
@@ -196,56 +200,22 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         return new SimpleRecordSchema(fields);
     }
 
-    private DataType getDataType(final int sqlType, final ResultSet rs, final 
int columnIndex, final RecordSchema readerSchema) throws SQLException {
+    private DataType getDataType(final int sqlType, final ResultSet rs, final 
int columnIndex, final RecordSchema readerSchema, final boolean useLogicalTypes)
+            throws SQLException {
         switch (sqlType) {
             case Types.ARRAY:
-                // The JDBC API does not allow us to know what the base type 
of an array is through the metadata.
-                // As a result, we have to obtain the actual Array for this 
record. Once we have this, we can determine
-                // the base type. However, if the base type is, itself, an 
array, we will simply return a base type of
-                // String because otherwise, we need the ResultSet for the 
array itself, and many JDBC Drivers do not
-                // support calling Array.getResultSet() and will throw an 
Exception if that is not supported.
-                try {
-                    final Array array = rs.getArray(columnIndex);
-
-                    if (array == null) {
-                        return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
-                    }
-                    final DataType baseType = getArrayBaseType(array);
-                    return RecordFieldType.ARRAY.getArrayDataType(baseType);
-                } catch (SQLFeatureNotSupportedException sfnse) {
-                    return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
-                }
+                return getArrayDataType(rs, columnIndex, useLogicalTypes);
             case Types.BINARY:
             case Types.LONGVARBINARY:
             case Types.VARBINARY:
                 return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
             case Types.NUMERIC:
             case Types.DECIMAL:
-                int decimalPrecision;
-                final int decimalScale;
-                final int resultSetPrecision = 
rs.getMetaData().getPrecision(columnIndex);
-                final int resultSetScale = 
rs.getMetaData().getScale(columnIndex);
-                if (rs.getMetaData().getPrecision(columnIndex) > 0) {
-                    // When database returns a certain precision, we can rely 
on that.
-                    decimalPrecision = resultSetPrecision;
-                    //For the float data type Oracle return decimalScale < 0 
which cause is not expected to org.apache.avro.LogicalTypes
-                    //Hence falling back to default scale if decimalScale < 0
-                    decimalScale = resultSetScale > 0 ? resultSetScale : 
defaultScale;
+                if (useLogicalTypes) {
+                    return getDecimalDataType(rs, columnIndex);
                 } else {
-                    // If not, use default precision.
-                    decimalPrecision = defaultPrecision;
-                    // Oracle returns precision=0, scale=-127 for variable 
scale value such as ROWNUM or function result.
-                    // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty 
makes it to return scale=0 instead.
-                    // Queries for example, 'SELECT 1.23 as v from DUAL' can 
be problematic because it can't be mapped with decimal with scale=0.
-                    // Default scale is used to preserve decimals in such case.
-                    decimalScale = resultSetScale > 0 ? resultSetScale : 
defaultScale;
+                    return RecordFieldType.STRING.getDataType();
                 }
-                // Scale can be bigger than precision in some cases (Oracle, 
e.g.) If this is the case, assume precision refers to the number of
-                // decimal digits and thus precision = scale
-                if (decimalScale > decimalPrecision) {
-                    decimalPrecision = decimalScale;
-                }
-                return 
RecordFieldType.DECIMAL.getDecimalDataType(decimalPrecision, decimalScale);
             case Types.OTHER: {
                 // If we have no records to inspect, we can't really know its 
schema so we simply use the default data type.
                 if (rs.isAfterLast()) {
@@ -257,10 +227,14 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                 if (readerSchema != null) {
                     Optional<DataType> dataType = 
readerSchema.getDataType(columnName);
                     if (dataType.isPresent()) {
-                        return dataType.get();
+                        return determineDataTypeToReturn(dataType.get(), 
useLogicalTypes);
                     }
                 }
 
+                if (!useLogicalTypes) {
+                    return RecordFieldType.STRING.getDataType();
+                }
+
                 final Object obj = rs.getObject(columnIndex);
                 if (!(obj instanceof Record)) {
                     final List<DataType> dataTypes = 
Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, 
RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
@@ -282,11 +256,11 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                 if (readerSchema != null) {
                     Optional<DataType> dataType = 
readerSchema.getDataType(columnName);
                     if (dataType.isPresent()) {
-                        return dataType.get();
+                        return determineDataTypeToReturn(dataType.get(), 
useLogicalTypes);
                     }
                 }
 
-                final RecordFieldType fieldType = getFieldType(sqlType, 
rs.getMetaData().getColumnClassName(columnIndex));
+                final RecordFieldType fieldType = getFieldType(sqlType, 
rs.getMetaData().getColumnClassName(columnIndex), useLogicalTypes);
 
                 if (RecordFieldType.DECIMAL.equals(fieldType)) {
                     final BigDecimal bigDecimalValue = 
rs.getBigDecimal(columnIndex);
@@ -298,7 +272,67 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         }
     }
 
-    private static DataType getArrayBaseType(final Array array) throws 
SQLException {
+    private DataType determineDataTypeToReturn(final DataType dataType, final 
boolean useLogicalTypes) {
+        RecordFieldType fieldType = dataType.getFieldType();
+        if (!useLogicalTypes
+                && (fieldType == RecordFieldType.DECIMAL
+                || fieldType == RecordFieldType.DATE
+                || fieldType == RecordFieldType.TIME
+                || fieldType == RecordFieldType.TIMESTAMP)) {
+            return RecordFieldType.STRING.getDataType();
+        } else {
+            return dataType;
+        }
+    }
+
+    private DataType getArrayDataType(final ResultSet rs, final int 
columnIndex, final boolean useLogicalTypes) throws SQLException {
+        // The JDBC API does not allow us to know what the base type of an 
array is through the metadata.
+        // As a result, we have to obtain the actual Array for this record. 
Once we have this, we can determine
+        // the base type. However, if the base type is, itself, an array, we 
will simply return a base type of
+        // String because otherwise, we need the ResultSet for the array 
itself, and many JDBC Drivers do not
+        // support calling Array.getResultSet() and will throw an Exception if 
that is not supported.
+        try {
+            final Array array = rs.getArray(columnIndex);
+
+            if (array == null) {
+                return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+            }
+            final DataType baseType = getArrayBaseType(array, useLogicalTypes);
+            return RecordFieldType.ARRAY.getArrayDataType(baseType);
+        } catch (SQLFeatureNotSupportedException sfnse) {
+            return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+        }
+    }
+
+    private DataType getDecimalDataType(final ResultSet rs, final int 
columnIndex) throws SQLException {
+        int decimalPrecision;
+        final int decimalScale;
+        final int resultSetPrecision = 
rs.getMetaData().getPrecision(columnIndex);
+        final int resultSetScale = rs.getMetaData().getScale(columnIndex);
+        if (resultSetPrecision > 0) {
+            // When database returns a certain precision, we can rely on that.
+            decimalPrecision = resultSetPrecision;
+            //For the float data type Oracle return decimalScale < 0 which 
cause is not expected to org.apache.avro.LogicalTypes
+            //Hence falling back to default scale if decimalScale < 0
+            decimalScale = resultSetScale > 0 ? resultSetScale : defaultScale;
+        } else {
+            // If not, use default precision.
+            decimalPrecision = defaultPrecision;
+            // Oracle returns precision=0, scale=-127 for variable scale value 
such as ROWNUM or function result.
+            // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes 
it to return scale=0 instead.
+            // Queries for example, 'SELECT 1.23 as v from DUAL' can be 
problematic because it can't be mapped with decimal with scale=0.
+            // Default scale is used to preserve decimals in such case.
+            decimalScale = resultSetScale > 0 ? resultSetScale : defaultScale;
+        }
+        // Scale can be bigger than precision in some cases (Oracle, e.g.) If 
this is the case, assume precision refers to the number of
+        // decimal digits and thus precision = scale
+        if (decimalScale > decimalPrecision) {
+            decimalPrecision = decimalScale;
+        }
+        return RecordFieldType.DECIMAL.getDecimalDataType(decimalPrecision, 
decimalScale);
+    }
+
+    private static DataType getArrayBaseType(final Array array, final boolean 
useLogicalTypes) throws SQLException {
         final Object arrayValue = array.getArray();
         if (arrayValue == null) {
             return RecordFieldType.STRING.getDataType();
@@ -367,8 +401,12 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                 return RecordFieldType.DOUBLE.getDataType();
             }
             if (valueToLookAt instanceof BigDecimal) {
-                final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
-                return 
RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), 
bigDecimal.scale());
+                if (useLogicalTypes) {
+                    final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
+                    return 
RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(), 
bigDecimal.scale());
+                } else {
+                    return RecordFieldType.STRING.getDataType();
+                }
             }
             if (valueToLookAt instanceof Boolean) {
                 return RecordFieldType.BOOLEAN.getDataType();
@@ -383,13 +421,13 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                 return RecordFieldType.INT.getDataType();
             }
             if (valueToLookAt instanceof java.sql.Time) {
-                return RecordFieldType.TIME.getDataType();
+                return getDataType(RecordFieldType.TIME, useLogicalTypes);
             }
             if (valueToLookAt instanceof java.sql.Date) {
-                return RecordFieldType.DATE.getDataType();
+                return getDataType(RecordFieldType.DATE, useLogicalTypes);
             }
             if (valueToLookAt instanceof java.sql.Timestamp) {
-                return RecordFieldType.TIMESTAMP.getDataType();
+                return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes);
             }
             if (valueToLookAt instanceof Record) {
                 final Record record = (Record) valueToLookAt;
@@ -400,8 +438,15 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         return RecordFieldType.STRING.getDataType();
     }
 
+    private static DataType getDataType(final RecordFieldType recordFieldType, 
final boolean useLogicalTypes) {
+        if (useLogicalTypes) {
+            return recordFieldType.getDataType();
+        } else {
+            return RecordFieldType.STRING.getDataType();
+        }
+    }
 
-    private static RecordFieldType getFieldType(final int sqlType, final 
String valueClassName) {
+    private static RecordFieldType getFieldType(final int sqlType, final 
String valueClassName, final boolean useLogicalTypes) {
         switch (sqlType) {
             case Types.BIGINT:
             case Types.ROWID:
@@ -412,10 +457,10 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
             case Types.CHAR:
                 return RecordFieldType.CHAR;
             case Types.DATE:
-                return RecordFieldType.DATE;
+                return getRecordFieldType(RecordFieldType.DATE, 
useLogicalTypes);
             case Types.NUMERIC:
             case Types.DECIMAL:
-                return RecordFieldType.DECIMAL;
+                return getRecordFieldType(RecordFieldType.DECIMAL, 
useLogicalTypes);
             case Types.DOUBLE:
             case Types.REAL:
                 return RecordFieldType.DOUBLE;
@@ -446,7 +491,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                     return RecordFieldType.LONG;
                 }
                 if (DATE_CLASS_NAME.equals(valueClassName)) {
-                    return RecordFieldType.DATE;
+                    return getRecordFieldType(RecordFieldType.DATE, 
useLogicalTypes);
                 }
                 if (FLOAT_CLASS_NAME.equals(valueClassName)) {
                     return RecordFieldType.FLOAT;
@@ -455,20 +500,28 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
                     return RecordFieldType.DOUBLE;
                 }
                 if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) {
-                    return RecordFieldType.DECIMAL;
+                    return getRecordFieldType(RecordFieldType.DECIMAL, 
useLogicalTypes);
                 }
 
                 return RecordFieldType.RECORD;
             case Types.TIME:
             case Types.TIME_WITH_TIMEZONE:
-                return RecordFieldType.TIME;
+                return getRecordFieldType(RecordFieldType.TIME, 
useLogicalTypes);
             case Types.TIMESTAMP:
             case Types.TIMESTAMP_WITH_TIMEZONE:
             case -101: // Oracle's TIMESTAMP WITH TIME ZONE
             case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
-                return RecordFieldType.TIMESTAMP;
+                return getRecordFieldType(RecordFieldType.TIMESTAMP, 
useLogicalTypes);
         }
 
         return RecordFieldType.STRING;
     }
+
+    private static RecordFieldType getRecordFieldType(final RecordFieldType 
recordFieldType, final boolean useLogicalTypes) {
+        if (useLogicalTypes) {
+            return recordFieldType;
+        } else {
+            return RecordFieldType.STRING;
+        }
+    }
 }
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index 3cefc4e..10fd23f 100644
--- 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.serialization.record;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,21 +28,30 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Date;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.time.LocalDate;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -66,26 +76,27 @@ public class ResultSetRecordSetTest {
     private static final String COLUMN_NAME_BIG_DECIMAL_4 = "bigDecimal4";
     private static final String COLUMN_NAME_BIG_DECIMAL_5 = "bigDecimal5";
 
-    private static final Object[][] COLUMNS = new Object[][] {
-            // column number; column label / name / schema field; column type; 
schema data type;
-            {1, COLUMN_NAME_VARCHAR, Types.VARCHAR, 
RecordFieldType.STRING.getDataType()},
-            {2, COLUMN_NAME_BIGINT, Types.BIGINT, 
RecordFieldType.LONG.getDataType()},
-            {3, COLUMN_NAME_ROWID, Types.ROWID, 
RecordFieldType.LONG.getDataType()},
-            {4, COLUMN_NAME_BIT, Types.BIT, 
RecordFieldType.BOOLEAN.getDataType()},
-            {5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, 
RecordFieldType.BOOLEAN.getDataType()},
-            {6, COLUMN_NAME_CHAR, Types.CHAR, 
RecordFieldType.CHAR.getDataType()},
-            {7, COLUMN_NAME_DATE, Types.DATE, 
RecordFieldType.DATE.getDataType()},
-            {8, COLUMN_NAME_INTEGER, Types.INTEGER, 
RecordFieldType.INT.getDataType()},
-            {9, COLUMN_NAME_DOUBLE, Types.DOUBLE, 
RecordFieldType.DOUBLE.getDataType()},
-            {10, COLUMN_NAME_REAL, Types.REAL, 
RecordFieldType.DOUBLE.getDataType()},
-            {11, COLUMN_NAME_FLOAT, Types.FLOAT, 
RecordFieldType.FLOAT.getDataType()},
-            {12, COLUMN_NAME_SMALLINT, Types.SMALLINT, 
RecordFieldType.SHORT.getDataType()},
-            {13, COLUMN_NAME_TINYINT, Types.TINYINT, 
RecordFieldType.BYTE.getDataType()},
-            {14, COLUMN_NAME_BIG_DECIMAL_1, 
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
-            {15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, 
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
-            {16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, 
RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
-            {17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
-            {18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(3, 10)},
+    private static final long TIMESTAMP_IN_MILLIS = 1631809132516L;
+
+    private static final TestColumn[] COLUMNS = new TestColumn[] {
+            new TestColumn(1, COLUMN_NAME_VARCHAR, Types.VARCHAR, 
RecordFieldType.STRING.getDataType()),
+            new TestColumn(2, COLUMN_NAME_BIGINT, Types.BIGINT, 
RecordFieldType.LONG.getDataType()),
+            new TestColumn(3, COLUMN_NAME_ROWID, Types.ROWID, 
RecordFieldType.LONG.getDataType()),
+            new TestColumn(4, COLUMN_NAME_BIT, Types.BIT, 
RecordFieldType.BOOLEAN.getDataType()),
+            new TestColumn(5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN, 
RecordFieldType.BOOLEAN.getDataType()),
+            new TestColumn(6, COLUMN_NAME_CHAR, Types.CHAR, 
RecordFieldType.CHAR.getDataType()),
+            new TestColumn(7, COLUMN_NAME_DATE, Types.DATE, 
RecordFieldType.DATE.getDataType()),
+            new TestColumn(8, COLUMN_NAME_INTEGER, Types.INTEGER, 
RecordFieldType.INT.getDataType()),
+            new TestColumn(9, COLUMN_NAME_DOUBLE, Types.DOUBLE, 
RecordFieldType.DOUBLE.getDataType()),
+            new TestColumn(10, COLUMN_NAME_REAL, Types.REAL, 
RecordFieldType.DOUBLE.getDataType()),
+            new TestColumn(11, COLUMN_NAME_FLOAT, Types.FLOAT, 
RecordFieldType.FLOAT.getDataType()),
+            new TestColumn(12, COLUMN_NAME_SMALLINT, Types.SMALLINT, 
RecordFieldType.SHORT.getDataType()),
+            new TestColumn(13, COLUMN_NAME_TINYINT, Types.TINYINT, 
RecordFieldType.BYTE.getDataType()),
+            new TestColumn(14, COLUMN_NAME_BIG_DECIMAL_1, 
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)),
+            new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, 
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
+            new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT, 
RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
+            new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
+            new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(3, 10))
     };
 
     @Mock
@@ -96,49 +107,34 @@ public class ResultSetRecordSetTest {
 
     @Before
     public void setUp() throws SQLException {
-        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
-        when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
-
-        for (final Object[] column : COLUMNS) {
-            when(resultSetMetaData.getColumnLabel((Integer) 
column[0])).thenReturn((String) (column[1]));
-            when(resultSetMetaData.getColumnName((Integer) 
column[0])).thenReturn((String) column[1]);
-            when(resultSetMetaData.getColumnType((Integer) 
column[0])).thenReturn((Integer) column[2]);
-
-            if(column[3] instanceof DecimalDataType) {
-                DecimalDataType ddt = (DecimalDataType)column[3];
-                when(resultSetMetaData.getPrecision((Integer) 
column[0])).thenReturn(ddt.getPrecision());
-                when(resultSetMetaData.getScale((Integer) 
column[0])).thenReturn(ddt.getScale());
-            }
-        }
-
-        // Big decimal values are necessary in order to determine precision 
and scale
-        when(resultSet.getBigDecimal(16)).thenReturn(new 
BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
-
-        // This will be handled by a dedicated branch for Java Objects, needs 
some further details
-        
when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
+        setUpMocks(COLUMNS, resultSetMetaData, resultSet);
     }
 
     @Test
     public void testCreateSchema() throws SQLException {
         // given
-        final RecordSchema recordSchema = givenRecordSchema();
+        final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
+        final RecordSchema expectedSchema = givenRecordSchema(COLUMNS);
 
         // when
         final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, recordSchema);
-        final RecordSchema resultSchema = testSubject.getSchema();
+        final RecordSchema actualSchema = testSubject.getSchema();
 
         // then
-        thenAllColumnDataTypesAreCorrect(resultSchema);
+        thenAllColumnDataTypesAreCorrect(COLUMNS, expectedSchema, 
actualSchema);
     }
 
     @Test
     public void testCreateSchemaWhenNoRecordSchema() throws SQLException {
+        // given
+        final RecordSchema expectedSchema = givenRecordSchema(COLUMNS);
+
         // when
         final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, null);
-        final RecordSchema resultSchema = testSubject.getSchema();
+        final RecordSchema actualSchema = testSubject.getSchema();
 
         // then
-        thenAllColumnDataTypesAreCorrect(resultSchema);
+        thenAllColumnDataTypesAreCorrect(COLUMNS, expectedSchema, 
actualSchema);
     }
 
     @Test
@@ -147,7 +143,7 @@ public class ResultSetRecordSetTest {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("column", 
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
         final RecordSchema recordSchema = new SimpleRecordSchema(fields);
-        final ResultSet resultSet = givenResultSetForOther();
+        final ResultSet resultSet = givenResultSetForOther(fields);
 
         // when
         final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, recordSchema);
@@ -158,9 +154,86 @@ public class ResultSetRecordSetTest {
     }
 
     @Test
+    public void testCreateSchemaWhenOtherTypeUsingLogicalTypes() throws 
SQLException {
+        // given
+        final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForOther(fields);
+
+        // when
+        final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, recordSchema, 10, 0, true);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesMatchInputFieldType(fields, resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeAndNoLogicalTypes() throws 
SQLException {
+        // given
+        final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+        final ResultSet resultSet = givenResultSetForOther(fields);
+
+        // when
+        final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, recordSchema, 10, 0, false);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesAreString(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeUsingLogicalTypesNoSchema() 
throws SQLException {
+        // given
+        final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+        final ResultSet resultSet = givenResultSetForOther(fields);
+
+        // when
+        final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, null, 10, 0, true);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesAreChoice(fields, resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeAndNoLogicalTypesNoSchema() 
throws SQLException {
+        // given
+        final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+        final ResultSet resultSet = givenResultSetForOther(fields);
+
+        // when
+        final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, null, 10, 0, false);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesAreString(resultSchema);
+    }
+
+    @Test
+    public void testCreateSchemaWhenOtherTypeUsingLogicalTypesWithRecord() 
throws SQLException {
+        // given
+        final Record inputRecord = givenInputRecord(); // The field's type is 
going to be RECORD (there is a record within a record)
+        final List<RecordField> fields = 
givenFieldsThatAreOfTypeRecord(Arrays.asList(inputRecord));
+        final ResultSet resultSet = givenResultSetForOther(fields);
+
+        when(resultSet.getObject(1)).thenReturn(inputRecord);
+
+        // when
+        final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, null, 10, 0, true);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesMatchInputFieldType(fields, resultSchema);
+    }
+
+    @Test
     public void testCreateSchemaWhenOtherTypeWithoutSchema() throws 
SQLException {
         // given
-        final ResultSet resultSet = givenResultSetForOther();
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", 
RecordFieldType.STRING.getDataType()));
+        final ResultSet resultSet = givenResultSetForOther(fields);
 
         // when
         final ResultSetRecordSet testSubject = new 
ResultSetRecordSet(resultSet, null);
@@ -173,7 +246,7 @@ public class ResultSetRecordSetTest {
     @Test
     public void testCreateRecord() throws SQLException {
         // given
-        final RecordSchema recordSchema = givenRecordSchema();
+        final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
 
         LocalDate testDate = LocalDate.of(2021, 1, 26);
 
@@ -257,6 +330,28 @@ public class ResultSetRecordSetTest {
     }
 
     @Test
+    public void 
testCreateSchemaThrowsExceptionSchemaCreationStillCalledConsideringLogicalTypeFlag()
 throws SQLException {
+        // given
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("column", 
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final ResultSet resultSet = Mockito.mock(ResultSet.class);
+        final ResultSetMetaData resultSetMetaData = 
Mockito.mock(ResultSetMetaData.class);
+        when(resultSet.getMetaData()).thenThrow(new SQLException("test 
exception")).thenReturn(resultSetMetaData);
+        when(resultSetMetaData.getColumnCount()).thenReturn(1);
+        when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.DECIMAL);
+
+        // when
+        ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, 
recordSchema, 10,0, false);
+        final RecordSchema resultSchema = testSubject.getSchema();
+
+        // then
+        thenAllDataTypesAreString(resultSchema);
+    }
+
+    @Test
     public void testCreateSchemaArrayThrowsNotSupportedException() throws 
SQLException {
         // given
         final List<RecordField> fields = new ArrayList<>();
@@ -272,6 +367,175 @@ public class ResultSetRecordSetTest {
         
assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
 resultSchema.getField(0).getDataType());
     }
 
+    @Test
+    public void testArrayTypeWithLogicalTypes() throws SQLException {
+        testArrayType(true);
+    }
+
+    @Test
+    public void testArrayTypeNoLogicalTypes() throws SQLException {
+        testArrayType(false);
+    }
+
+    @Test
+    public void testCreateSchemaWithLogicalTypes() throws SQLException {
+        testCreateSchemaLogicalTypes(true, true);
+    }
+
+    @Test
+    public void testCreateSchemaNoLogicalTypes() throws SQLException {
+        testCreateSchemaLogicalTypes(false, true);
+    }
+
+    @Test
+    public void testCreateSchemaWithLogicalTypesNoInputSchema() throws 
SQLException {
+        testCreateSchemaLogicalTypes(true, false);
+    }
+
+    @Test
+    public void testCreateSchemaNoLogicalTypesNoInputSchema() throws 
SQLException {
+        testCreateSchemaLogicalTypes(false, false);
+    }
+
+    private void testArrayType(boolean useLogicalTypes) throws SQLException {
+        // GIVEN
+        List<ArrayTestData> testData = 
givenArrayTypesThatRequireLogicalTypes();
+        Map<String, DataType> expectedTypes = 
givenExpectedTypesForArrayTypesThatRequireLogicalTypes(useLogicalTypes);
+
+        // WHEN
+        ResultSet resultSet = Mockito.mock(ResultSet.class);
+        ResultSetMetaData resultSetMetaData = 
Mockito.mock(ResultSetMetaData.class);
+        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        when(resultSetMetaData.getColumnCount()).thenReturn(testData.size());
+
+        List<RecordField> fields = 
whenSchemaFieldsAreSetupForArrayType(testData, resultSet, resultSetMetaData);
+        RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, 
recordSchema, 10,0, useLogicalTypes);
+        RecordSchema actualSchema = testSubject.getSchema();
+
+        // THEN
+        thenActualArrayElementTypesMatchExpected(expectedTypes, actualSchema);
+    }
+
+    private void testCreateSchemaLogicalTypes(boolean useLogicalTypes, boolean 
provideInputSchema) throws SQLException {
+        // GIVEN
+        TestColumn[] columns = new TestColumn[]{
+                new TestColumn(1, COLUMN_NAME_DATE, Types.DATE, 
RecordFieldType.DATE.getDataType()),
+                new TestColumn(2, "time", Types.TIME, 
RecordFieldType.TIME.getDataType()),
+                new TestColumn(3, "time_with_timezone", 
Types.TIME_WITH_TIMEZONE, RecordFieldType.TIME.getDataType()),
+                new TestColumn(4, "timestamp", Types.TIMESTAMP, 
RecordFieldType.TIMESTAMP.getDataType()),
+                new TestColumn(5, "timestamp_with_timezone", 
Types.TIMESTAMP_WITH_TIMEZONE, RecordFieldType.TIMESTAMP.getDataType()),
+                new TestColumn(6, COLUMN_NAME_BIG_DECIMAL_1, 
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)),
+                new TestColumn(7, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC, 
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
+                new TestColumn(8, COLUMN_NAME_BIG_DECIMAL_3, 
Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
+                new TestColumn(9, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
+                new TestColumn(10, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL, 
RecordFieldType.DECIMAL.getDecimalDataType(3, 10)),
+        };
+        final RecordSchema recordSchema = provideInputSchema ? 
givenRecordSchema(columns) : null;
+
+        ResultSetMetaData resultSetMetaData = 
Mockito.mock(ResultSetMetaData.class);
+        ResultSet resultSet = Mockito.mock(ResultSet.class);
+
+        RecordSchema expectedSchema = useLogicalTypes ? 
givenRecordSchema(columns) : givenRecordSchemaWithOnlyStringType(columns);
+
+        // WHEN
+        setUpMocks(columns, resultSetMetaData, resultSet);
+
+        ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet, 
recordSchema, 10,0, useLogicalTypes);
+        RecordSchema actualSchema = testSubject.getSchema();
+
+        // THEN
+        thenAllColumnDataTypesAreCorrect(columns, expectedSchema, 
actualSchema);
+    }
+
+    private void setUpMocks(TestColumn[] columns, ResultSetMetaData 
resultSetMetaData, ResultSet resultSet) throws SQLException {
+        when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+        when(resultSetMetaData.getColumnCount()).thenReturn(columns.length);
+
+        int indexOfBigDecimal = -1;
+        int index = 0;
+        for (final TestColumn column : columns) {
+            
when(resultSetMetaData.getColumnLabel(column.getIndex())).thenReturn(column.getColumnName());
+            
when(resultSetMetaData.getColumnName(column.getIndex())).thenReturn(column.getColumnName());
+            
when(resultSetMetaData.getColumnType(column.getIndex())).thenReturn(column.getSqlType());
+
+            if (column.getRecordFieldType() instanceof DecimalDataType) {
+                DecimalDataType ddt = (DecimalDataType) 
column.getRecordFieldType();
+                
when(resultSetMetaData.getPrecision(column.getIndex())).thenReturn(ddt.getPrecision());
+                
when(resultSetMetaData.getScale(column.getIndex())).thenReturn(ddt.getScale());
+            }
+            if (column.getSqlType() == Types.JAVA_OBJECT) {
+                indexOfBigDecimal = index + 1;
+            }
+            ++index;
+        }
+
+        // Big decimal values are necessary in order to determine precision 
and scale
+        when(resultSet.getBigDecimal(indexOfBigDecimal)).thenReturn(new 
BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
+
+        // This will be handled by a dedicated branch for Java Objects, needs 
some further details
+        
when(resultSetMetaData.getColumnClassName(indexOfBigDecimal)).thenReturn(BigDecimal.class.getName());
+    }
+
+    private List<RecordField> givenFieldsThatRequireLogicalTypes() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("decimal", 
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+        fields.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
+        fields.add(new RecordField("time", 
RecordFieldType.TIME.getDataType()));
+        fields.add(new RecordField("timestamp", 
RecordFieldType.TIMESTAMP.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema givenRecordSchema(TestColumn[] columns) {
+        final List<RecordField> fields = new ArrayList<>(columns.length);
+
+        for (TestColumn column : columns) {
+            fields.add(new RecordField(column.getColumnName(), 
column.getRecordFieldType()));
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private RecordSchema givenRecordSchemaWithOnlyStringType(TestColumn[] 
columns) {
+        final List<RecordField> fields = new ArrayList<>(columns.length);
+
+        for (TestColumn column : columns) {
+            fields.add(new RecordField(column.getColumnName(), 
RecordFieldType.STRING.getDataType()));
+        }
+
+        return new SimpleRecordSchema(fields);
+    }
+
+    private List<ArrayTestData> givenArrayTypesThatRequireLogicalTypes() {
+        List<ArrayTestData> testData = new ArrayList<>();
+        testData.add(new ArrayTestData("arrayBigDecimal",
+                new ResultBigDecimal[]{new ResultBigDecimal(), new 
ResultBigDecimal()}));
+        testData.add(new ArrayTestData("arrayDate",
+                new Date[]{new Date(TIMESTAMP_IN_MILLIS), new 
Date(TIMESTAMP_IN_MILLIS)}));
+        testData.add(new ArrayTestData("arrayTime",
+                new Time[]{new Time(TIMESTAMP_IN_MILLIS), new 
Time(TIMESTAMP_IN_MILLIS)}));
+        testData.add(new ArrayTestData("arrayTimestamp",
+                new Timestamp[]{new Timestamp(TIMESTAMP_IN_MILLIS), new 
Timestamp(TIMESTAMP_IN_MILLIS)}));
+        return testData;
+    }
+
+    private Map<String, DataType> 
givenExpectedTypesForArrayTypesThatRequireLogicalTypes(final boolean 
useLogicalTypes) {
+        Map<String, DataType> expectedTypes = new HashMap<>();
+        if (useLogicalTypes) {
+            expectedTypes.put("arrayBigDecimal", 
RecordFieldType.DECIMAL.getDecimalDataType(ResultBigDecimal.PRECISION, 
ResultBigDecimal.SCALE));
+            expectedTypes.put("arrayDate", RecordFieldType.DATE.getDataType());
+            expectedTypes.put("arrayTime", RecordFieldType.TIME.getDataType());
+            expectedTypes.put("arrayTimestamp", 
RecordFieldType.TIMESTAMP.getDataType());
+        } else {
+            expectedTypes.put("arrayBigDecimal", 
RecordFieldType.STRING.getDataType());
+            expectedTypes.put("arrayDate", 
RecordFieldType.STRING.getDataType());
+            expectedTypes.put("arrayTime", 
RecordFieldType.STRING.getDataType());
+            expectedTypes.put("arrayTimestamp", 
RecordFieldType.STRING.getDataType());
+        }
+        return expectedTypes;
+    }
+
     private ResultSet givenResultSetForArrayThrowsException(boolean 
featureSupported) throws SQLException {
         final ResultSet resultSet = Mockito.mock(ResultSet.class);
         final ResultSetMetaData resultSetMetaData = 
Mockito.mock(ResultSetMetaData.class);
@@ -283,41 +547,234 @@ public class ResultSetRecordSetTest {
         return resultSet;
     }
 
-    private ResultSet givenResultSetForOther() throws SQLException {
+    private ResultSet givenResultSetForOther(List<RecordField> fields) throws 
SQLException {
         final ResultSet resultSet = Mockito.mock(ResultSet.class);
         final ResultSetMetaData resultSetMetaData = 
Mockito.mock(ResultSetMetaData.class);
         when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
-        when(resultSetMetaData.getColumnCount()).thenReturn(1);
-        when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
-        when(resultSetMetaData.getColumnName(1)).thenReturn("column");
-        when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
+        when(resultSetMetaData.getColumnCount()).thenReturn(fields.size());
+        for (int i = 0; i < fields.size(); ++i) {
+            int columnIndex = i + 1;
+            
when(resultSetMetaData.getColumnLabel(columnIndex)).thenReturn(fields.get(i).getFieldName());
+            
when(resultSetMetaData.getColumnName(columnIndex)).thenReturn(fields.get(i).getFieldName());
+            
when(resultSetMetaData.getColumnType(columnIndex)).thenReturn(Types.OTHER);
+        }
         return resultSet;
     }
 
-    private RecordSchema givenRecordSchema() {
-        final List<RecordField> fields = new ArrayList<>();
+    private Record givenInputRecord() {
+        List<RecordField> inputRecordFields = new ArrayList<>(2);
+        inputRecordFields.add(new RecordField("id", 
RecordFieldType.INT.getDataType()));
+        inputRecordFields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        RecordSchema inputRecordSchema = new 
SimpleRecordSchema(inputRecordFields);
+
+        Map<String, Object> inputRecordData = new HashMap<>(2);
+        inputRecordData.put("id", 1);
+        inputRecordData.put("name", "John");
+
+        return new MapRecord(inputRecordSchema, inputRecordData);
+    }
+
+    private List<RecordField> givenFieldsThatAreOfTypeRecord(List<Record> 
concreteRecords) {
+        List<RecordField> fields = new ArrayList<>(concreteRecords.size());
+        int i = 1;
+        for (Record record : concreteRecords) {
+            fields.add(new RecordField("record" + String.valueOf(i), 
RecordFieldType.RECORD.getRecordDataType(record.getSchema())));
+            ++i;
+        }
+        return fields;
+    }
 
-        for (final Object[] column : COLUMNS) {
-            fields.add(new RecordField((String) column[1], (DataType) 
column[3]));
+    private List<RecordField> whenSchemaFieldsAreSetupForArrayType(final 
List<ArrayTestData> testData,
+                                                                   final 
ResultSet resultSet,
+                                                                   final 
ResultSetMetaData resultSetMetaData)
+            throws SQLException {
+        List<RecordField> fields = new ArrayList<>();
+        for (int i = 0; i < testData.size(); ++i) {
+            ArrayTestData testDatum = testData.get(i);
+            int columnIndex = i + 1;
+            ResultSqlArray arrayDummy = Mockito.mock(ResultSqlArray.class);
+            when(arrayDummy.getArray()).thenReturn(testDatum.getTestArray());
+            when(resultSet.getArray(columnIndex)).thenReturn(arrayDummy);
+            
when(resultSetMetaData.getColumnLabel(columnIndex)).thenReturn(testDatum.getFieldName());
+            
when(resultSetMetaData.getColumnType(columnIndex)).thenReturn(Types.ARRAY);
+            fields.add(new RecordField(testDatum.getFieldName(), 
RecordFieldType.ARRAY.getDataType()));
         }
+        return fields;
+    }
 
-        return new SimpleRecordSchema(fields);
+    private void thenAllDataTypesMatchInputFieldType(final List<RecordField> 
inputFields, final RecordSchema resultSchema) {
+        assertEquals("The number of input fields does not match the number of 
fields in the result schema.", inputFields.size(), 
resultSchema.getFieldCount());
+        for (int i = 0; i < inputFields.size(); ++i) {
+            assertEquals(inputFields.get(i).getDataType(), 
resultSchema.getField(i).getDataType());
+        }
     }
 
-    private void thenAllColumnDataTypesAreCorrect(final RecordSchema 
resultSchema) {
-        assertNotNull(resultSchema);
+    private void thenAllDataTypesAreString(final RecordSchema resultSchema) {
+        for (int i = 0; i < resultSchema.getFieldCount(); ++i) {
+            assertEquals(RecordFieldType.STRING.getDataType(), 
resultSchema.getField(i).getDataType());
+        }
+    }
 
-        for (final Object[] column : COLUMNS) {
+    private void thenAllColumnDataTypesAreCorrect(TestColumn[] columns, 
RecordSchema expectedSchema, RecordSchema actualSchema) {
+        assertNotNull(actualSchema);
+
+        for (TestColumn column : columns) {
+            int fieldIndex = column.getIndex() - 1;
             // The DECIMAL column with scale larger than precision will not 
match so verify that instead
-            DataType actualDataType = resultSchema.getField((Integer) 
column[0] - 1).getDataType();
-            DataType expectedDataType = (DataType) column[3];
+            DataType actualDataType = 
actualSchema.getField(fieldIndex).getDataType();
+            DataType expectedDataType = 
expectedSchema.getField(fieldIndex).getDataType();
             if 
(expectedDataType.equals(RecordFieldType.DECIMAL.getDecimalDataType(3, 10))) {
                 DecimalDataType decimalDataType = (DecimalDataType) 
expectedDataType;
                 if (decimalDataType.getScale() > 
decimalDataType.getPrecision()) {
                     expectedDataType = 
RecordFieldType.DECIMAL.getDecimalDataType(decimalDataType.getScale(), 
decimalDataType.getScale());
                 }
             }
-            assertEquals("For column " + column[0] + " the converted type is 
not matching", expectedDataType, actualDataType);
+            assertEquals("For column " + column.getIndex() + " the converted 
type is not matching", expectedDataType, actualDataType);
+        }
+    }
+
+    private void thenActualArrayElementTypesMatchExpected(Map<String, 
DataType> expectedTypes, RecordSchema actualSchema) {
+        for (RecordField recordField : actualSchema.getFields()) {
+            if (recordField.getDataType() instanceof ArrayDataType) {
+                ArrayDataType arrayType = (ArrayDataType) 
recordField.getDataType();
+                assertEquals("Array element type for " + 
recordField.getFieldName()
+                                + " is not of expected type " + 
expectedTypes.get(recordField.getFieldName()).toString(),
+                        expectedTypes.get(recordField.getFieldName()), 
arrayType.getElementType());
+            } else {
+                fail("RecordField " + recordField.getFieldName() + " is not 
instance of ArrayDataType");
+            }
+        }
+    }
+
+    private void thenAllDataTypesAreChoice(final List<RecordField> 
inputFields, final RecordSchema resultSchema) {
+        assertEquals("The number of input fields does not match the number of 
fields in the result schema.", inputFields.size(), 
resultSchema.getFieldCount());
+
+        DataType expectedType = getBroadestChoiceDataType();
+        for (int i = 0; i < inputFields.size(); ++i) {
+            assertEquals(expectedType, resultSchema.getField(i).getDataType());
+        }
+    }
+
+    private DataType getBroadestChoiceDataType() {
+        List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT, 
RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR, 
RecordFieldType.DATE,
+                RecordFieldType.DECIMAL, RecordFieldType.DOUBLE, 
RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG, 
RecordFieldType.SHORT, RecordFieldType.STRING,
+                RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
+                .map(RecordFieldType::getDataType)
+                .collect(Collectors.toList());
+        return RecordFieldType.CHOICE.getChoiceDataType(dataTypes);
+    }
+
+    private static class TestColumn {
+        private final int index; // Column indexing starts from 1, not 0.
+        private final String columnName;
+        private final int sqlType;
+        private final DataType recordFieldType;
+
+        public TestColumn(final int index, final String columnName, final int 
sqlType, final DataType recordFieldType) {
+            this.index = index;
+            this.columnName = columnName;
+            this.sqlType = sqlType;
+            this.recordFieldType = recordFieldType;
+        }
+
+        public int getIndex() {
+            return index;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public int getSqlType() {
+            return sqlType;
+        }
+
+        public DataType getRecordFieldType() {
+            return recordFieldType;
+        }
+    }
+
+    private static class ResultSqlArray implements Array {
+
+        @Override
+        public String getBaseTypeName() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public int getBaseType() throws SQLException {
+            return 0;
+        }
+
+        @Override
+        public Object getArray() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Object getArray(Map<String, Class<?>> map) throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Object getArray(long index, int count) throws SQLException {
+            return null;
+        }
+
+        @Override
+        public Object getArray(long index, int count, Map<String, Class<?>> 
map) throws SQLException {
+            return null;
+        }
+
+        @Override
+        public ResultSet getResultSet() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public ResultSet getResultSet(Map<String, Class<?>> map) throws 
SQLException {
+            return null;
+        }
+
+        @Override
+        public ResultSet getResultSet(long index, int count) throws 
SQLException {
+            return null;
+        }
+
+        @Override
+        public ResultSet getResultSet(long index, int count, Map<String, 
Class<?>> map) throws SQLException {
+            return null;
+        }
+
+        @Override
+        public void free() throws SQLException {
+
+        }
+    }
+
+    private static class ResultBigDecimal extends BigDecimal {
+        public static int PRECISION = 3;
+        public static int SCALE = 0;
+        public ResultBigDecimal() {
+            super("123");
+        }
+    }
+
+    private static class ArrayTestData {
+        final private String fieldName;
+        final private Object[] testArray;
+
+        public ArrayTestData(String fieldName, Object[] testArray) {
+            this.fieldName = fieldName;
+            this.testArray = testArray;
+        }
+
+        public String getFieldName() {
+            return fieldName;
+        }
+
+        public Object[] getTestArray() {
+            return testArray;
         }
     }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index 9862383..30e6c51 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -189,6 +189,10 @@ public class JdbcCommon {
             return defaultScale;
         }
 
+        public boolean isUseLogicalTypes() {
+            return useLogicalTypes;
+        }
+
         public static class Builder {
             private String recordName;
             private int maxRows = 0;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index f40acb2..18f7f12 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -70,7 +70,7 @@ public class RecordSqlWriter implements SqlWriter {
             if (fullRecordSet == null) {
                 final Schema avroSchema = JdbcCommon.createSchema(resultSet, 
options);
                 final RecordSchema recordAvroSchema = 
AvroTypeUtil.createSchema(avroSchema);
-                fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, 
recordAvroSchema, callback, options.getDefaultPrecision(), 
options.getDefaultScale());
+                fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, 
recordAvroSchema, callback, options.getDefaultPrecision(), 
options.getDefaultScale(), options.isUseLogicalTypes());
                 writeSchema = 
recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
             }
             recordSet = (maxRowsPerFlowFile > 0) ? 
fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
@@ -135,8 +135,8 @@ public class RecordSqlWriter implements SqlWriter {
         private final ResultSetRowCallback callback;
 
         ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema 
readerSchema, ResultSetRowCallback callback,
-                                       final int defaultPrecision, final int 
defaultScale) throws SQLException {
-            super(rs, readerSchema, defaultPrecision, defaultScale);
+                                       final int defaultPrecision, final int 
defaultScale, final boolean useLogicalTypes) throws SQLException {
+            super(rs, readerSchema, defaultPrecision, defaultScale, 
useLogicalTypes);
             this.callback = callback;
         }
 

Reply via email to