This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 38d26da NIFI-9192: ResultSetRecordSet considers value of
useLogicalTypes flag when determining the object's schema.
38d26da is described below
commit 38d26dafa36888826b1017f9f36725628dc40430
Author: Peter Gyori <[email protected]>
AuthorDate: Tue Sep 14 17:04:54 2021 +0200
NIFI-9192: ResultSetRecordSet considers value of useLogicalTypes flag when
determining the object's schema.
NIFI-9192: ResultSetRecordSet consider value of useLogicalType flag when
sqlType is not handled separately (default) and readerSchema is not null
Signed-off-by: Peter Gyori <[email protected]>
NIFI-9192: Unit tests added to test the use of useLogicalType flag
Signed-off-by: Peter Gyori <[email protected]>
NIFI-9192: ResultSetRecordSet consider not using logical type when sqlType
is OTHER
NIFI-9192: Fixed checkstyle violations
Signed-off-by: Peter Gyori <[email protected]>
NIFI-9192: Renamed JdbcCommon.getUseLogicalTypes to isUseLogicalTypes
Signed-off-by: Peter Gyori <[email protected]>
NIFI-9192: Added unit tests for Array types with and without using logical
types
Signed-off-by: Peter Gyori <[email protected]>
NIFI-9192: Review comments considered and unit tests refactored
NIFI-9192: Added further unit tests
NIFI-9192: Minor modifications based on review comments
Signed-off-by: Peter Gyori <[email protected]>
Signed-off-by: Matthew Burgess <[email protected]>
This closes #5388
---
.../serialization/record/ResultSetRecordSet.java | 173 +++---
.../record/ResultSetRecordSetTest.java | 585 ++++++++++++++++++---
.../java/org/apache/nifi/util/db/JdbcCommon.java | 4 +
.../processors/standard/sql/RecordSqlWriter.java | 6 +-
4 files changed, 641 insertions(+), 127 deletions(-)
diff --git
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 7bf48f0..37d5f19 100644
---
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -68,18 +68,22 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
}
public ResultSetRecordSet(final ResultSet rs, final RecordSchema
readerSchema, final int defaultPrecision, final int defaultScale) throws
SQLException {
+ this(rs, readerSchema, defaultPrecision, defaultScale, true);
+ }
+
+ public ResultSetRecordSet(final ResultSet rs, final RecordSchema
readerSchema, final int defaultPrecision, final int defaultScale, final boolean
useLogicalTypes) throws SQLException {
this.defaultPrecision = defaultPrecision;
this.defaultScale = defaultScale;
this.rs = rs;
this.rsColumnNames = new HashSet<>();
RecordSchema tempSchema;
try {
- tempSchema = createSchema(rs, readerSchema);
+ tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
moreRows = rs.next();
} catch(SQLException se) {
// Tried to create the schema with a ResultSet without calling
next() first (probably for DB2), now try the other way around
moreRows = rs.next();
- tempSchema = createSchema(rs, readerSchema);
+ tempSchema = createSchema(rs, readerSchema, useLogicalTypes);
}
this.schema = tempSchema;
}
@@ -168,7 +172,7 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return value;
}
- private RecordSchema createSchema(final ResultSet rs, final RecordSchema
readerSchema) throws SQLException {
+ private RecordSchema createSchema(final ResultSet rs, final RecordSchema
readerSchema, final boolean useLogicalTypes) throws SQLException {
final ResultSetMetaData metadata = rs.getMetaData();
final int numCols = metadata.getColumnCount();
final List<RecordField> fields = new ArrayList<>(numCols);
@@ -177,7 +181,7 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
final int column = i + 1;
final int sqlType = metadata.getColumnType(column);
- final DataType dataType = getDataType(sqlType, rs, column,
readerSchema);
+ final DataType dataType = getDataType(sqlType, rs, column,
readerSchema, useLogicalTypes);
final String fieldName = metadata.getColumnLabel(column);
final int nullableFlag = metadata.isNullable(column);
@@ -196,56 +200,22 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return new SimpleRecordSchema(fields);
}
- private DataType getDataType(final int sqlType, final ResultSet rs, final
int columnIndex, final RecordSchema readerSchema) throws SQLException {
+ private DataType getDataType(final int sqlType, final ResultSet rs, final
int columnIndex, final RecordSchema readerSchema, final boolean useLogicalTypes)
+ throws SQLException {
switch (sqlType) {
case Types.ARRAY:
- // The JDBC API does not allow us to know what the base type
of an array is through the metadata.
- // As a result, we have to obtain the actual Array for this
record. Once we have this, we can determine
- // the base type. However, if the base type is, itself, an
array, we will simply return a base type of
- // String because otherwise, we need the ResultSet for the
array itself, and many JDBC Drivers do not
- // support calling Array.getResultSet() and will throw an
Exception if that is not supported.
- try {
- final Array array = rs.getArray(columnIndex);
-
- if (array == null) {
- return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
- }
- final DataType baseType = getArrayBaseType(array);
- return RecordFieldType.ARRAY.getArrayDataType(baseType);
- } catch (SQLFeatureNotSupportedException sfnse) {
- return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
- }
+ return getArrayDataType(rs, columnIndex, useLogicalTypes);
case Types.BINARY:
case Types.LONGVARBINARY:
case Types.VARBINARY:
return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case Types.NUMERIC:
case Types.DECIMAL:
- int decimalPrecision;
- final int decimalScale;
- final int resultSetPrecision =
rs.getMetaData().getPrecision(columnIndex);
- final int resultSetScale =
rs.getMetaData().getScale(columnIndex);
- if (rs.getMetaData().getPrecision(columnIndex) > 0) {
- // When database returns a certain precision, we can rely
on that.
- decimalPrecision = resultSetPrecision;
- //For the float data type Oracle return decimalScale < 0
which cause is not expected to org.apache.avro.LogicalTypes
- //Hence falling back to default scale if decimalScale < 0
- decimalScale = resultSetScale > 0 ? resultSetScale :
defaultScale;
+ if (useLogicalTypes) {
+ return getDecimalDataType(rs, columnIndex);
} else {
- // If not, use default precision.
- decimalPrecision = defaultPrecision;
- // Oracle returns precision=0, scale=-127 for variable
scale value such as ROWNUM or function result.
- // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty
makes it to return scale=0 instead.
- // Queries for example, 'SELECT 1.23 as v from DUAL' can
be problematic because it can't be mapped with decimal with scale=0.
- // Default scale is used to preserve decimals in such case.
- decimalScale = resultSetScale > 0 ? resultSetScale :
defaultScale;
+ return RecordFieldType.STRING.getDataType();
}
- // Scale can be bigger than precision in some cases (Oracle,
e.g.) If this is the case, assume precision refers to the number of
- // decimal digits and thus precision = scale
- if (decimalScale > decimalPrecision) {
- decimalPrecision = decimalScale;
- }
- return
RecordFieldType.DECIMAL.getDecimalDataType(decimalPrecision, decimalScale);
case Types.OTHER: {
// If we have no records to inspect, we can't really know its
schema so we simply use the default data type.
if (rs.isAfterLast()) {
@@ -257,10 +227,14 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
if (readerSchema != null) {
Optional<DataType> dataType =
readerSchema.getDataType(columnName);
if (dataType.isPresent()) {
- return dataType.get();
+ return determineDataTypeToReturn(dataType.get(),
useLogicalTypes);
}
}
+ if (!useLogicalTypes) {
+ return RecordFieldType.STRING.getDataType();
+ }
+
final Object obj = rs.getObject(columnIndex);
if (!(obj instanceof Record)) {
final List<DataType> dataTypes =
Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN,
RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
@@ -282,11 +256,11 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
if (readerSchema != null) {
Optional<DataType> dataType =
readerSchema.getDataType(columnName);
if (dataType.isPresent()) {
- return dataType.get();
+ return determineDataTypeToReturn(dataType.get(),
useLogicalTypes);
}
}
- final RecordFieldType fieldType = getFieldType(sqlType,
rs.getMetaData().getColumnClassName(columnIndex));
+ final RecordFieldType fieldType = getFieldType(sqlType,
rs.getMetaData().getColumnClassName(columnIndex), useLogicalTypes);
if (RecordFieldType.DECIMAL.equals(fieldType)) {
final BigDecimal bigDecimalValue =
rs.getBigDecimal(columnIndex);
@@ -298,7 +272,67 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
}
}
- private static DataType getArrayBaseType(final Array array) throws
SQLException {
+ private DataType determineDataTypeToReturn(final DataType dataType, final
boolean useLogicalTypes) {
+ RecordFieldType fieldType = dataType.getFieldType();
+ if (!useLogicalTypes
+ && (fieldType == RecordFieldType.DECIMAL
+ || fieldType == RecordFieldType.DATE
+ || fieldType == RecordFieldType.TIME
+ || fieldType == RecordFieldType.TIMESTAMP)) {
+ return RecordFieldType.STRING.getDataType();
+ } else {
+ return dataType;
+ }
+ }
+
+ private DataType getArrayDataType(final ResultSet rs, final int
columnIndex, final boolean useLogicalTypes) throws SQLException {
+ // The JDBC API does not allow us to know what the base type of an
array is through the metadata.
+ // As a result, we have to obtain the actual Array for this record.
Once we have this, we can determine
+ // the base type. However, if the base type is, itself, an array, we
will simply return a base type of
+ // String because otherwise, we need the ResultSet for the array
itself, and many JDBC Drivers do not
+ // support calling Array.getResultSet() and will throw an Exception if
that is not supported.
+ try {
+ final Array array = rs.getArray(columnIndex);
+
+ if (array == null) {
+ return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+ final DataType baseType = getArrayBaseType(array, useLogicalTypes);
+ return RecordFieldType.ARRAY.getArrayDataType(baseType);
+ } catch (SQLFeatureNotSupportedException sfnse) {
+ return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ }
+ }
+
+ private DataType getDecimalDataType(final ResultSet rs, final int
columnIndex) throws SQLException {
+ int decimalPrecision;
+ final int decimalScale;
+ final int resultSetPrecision =
rs.getMetaData().getPrecision(columnIndex);
+ final int resultSetScale = rs.getMetaData().getScale(columnIndex);
+ if (resultSetPrecision > 0) {
+ // When database returns a certain precision, we can rely on that.
+ decimalPrecision = resultSetPrecision;
+ //For the float data type Oracle return decimalScale < 0 which
cause is not expected to org.apache.avro.LogicalTypes
+ //Hence falling back to default scale if decimalScale < 0
+ decimalScale = resultSetScale > 0 ? resultSetScale : defaultScale;
+ } else {
+ // If not, use default precision.
+ decimalPrecision = defaultPrecision;
+ // Oracle returns precision=0, scale=-127 for variable scale value
such as ROWNUM or function result.
+ // Specifying 'oracle.jdbc.J2EE13Compliant' SystemProperty makes
it to return scale=0 instead.
+ // Queries for example, 'SELECT 1.23 as v from DUAL' can be
problematic because it can't be mapped with decimal with scale=0.
+ // Default scale is used to preserve decimals in such case.
+ decimalScale = resultSetScale > 0 ? resultSetScale : defaultScale;
+ }
+ // Scale can be bigger than precision in some cases (Oracle, e.g.) If
this is the case, assume precision refers to the number of
+ // decimal digits and thus precision = scale
+ if (decimalScale > decimalPrecision) {
+ decimalPrecision = decimalScale;
+ }
+ return RecordFieldType.DECIMAL.getDecimalDataType(decimalPrecision,
decimalScale);
+ }
+
+ private static DataType getArrayBaseType(final Array array, final boolean
useLogicalTypes) throws SQLException {
final Object arrayValue = array.getArray();
if (arrayValue == null) {
return RecordFieldType.STRING.getDataType();
@@ -367,8 +401,12 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return RecordFieldType.DOUBLE.getDataType();
}
if (valueToLookAt instanceof BigDecimal) {
- final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
- return
RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(),
bigDecimal.scale());
+ if (useLogicalTypes) {
+ final BigDecimal bigDecimal = (BigDecimal) valueToLookAt;
+ return
RecordFieldType.DECIMAL.getDecimalDataType(bigDecimal.precision(),
bigDecimal.scale());
+ } else {
+ return RecordFieldType.STRING.getDataType();
+ }
}
if (valueToLookAt instanceof Boolean) {
return RecordFieldType.BOOLEAN.getDataType();
@@ -383,13 +421,13 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return RecordFieldType.INT.getDataType();
}
if (valueToLookAt instanceof java.sql.Time) {
- return RecordFieldType.TIME.getDataType();
+ return getDataType(RecordFieldType.TIME, useLogicalTypes);
}
if (valueToLookAt instanceof java.sql.Date) {
- return RecordFieldType.DATE.getDataType();
+ return getDataType(RecordFieldType.DATE, useLogicalTypes);
}
if (valueToLookAt instanceof java.sql.Timestamp) {
- return RecordFieldType.TIMESTAMP.getDataType();
+ return getDataType(RecordFieldType.TIMESTAMP, useLogicalTypes);
}
if (valueToLookAt instanceof Record) {
final Record record = (Record) valueToLookAt;
@@ -400,8 +438,15 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return RecordFieldType.STRING.getDataType();
}
+ private static DataType getDataType(final RecordFieldType recordFieldType,
final boolean useLogicalTypes) {
+ if (useLogicalTypes) {
+ return recordFieldType.getDataType();
+ } else {
+ return RecordFieldType.STRING.getDataType();
+ }
+ }
- private static RecordFieldType getFieldType(final int sqlType, final
String valueClassName) {
+ private static RecordFieldType getFieldType(final int sqlType, final
String valueClassName, final boolean useLogicalTypes) {
switch (sqlType) {
case Types.BIGINT:
case Types.ROWID:
@@ -412,10 +457,10 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
case Types.CHAR:
return RecordFieldType.CHAR;
case Types.DATE:
- return RecordFieldType.DATE;
+ return getRecordFieldType(RecordFieldType.DATE,
useLogicalTypes);
case Types.NUMERIC:
case Types.DECIMAL:
- return RecordFieldType.DECIMAL;
+ return getRecordFieldType(RecordFieldType.DECIMAL,
useLogicalTypes);
case Types.DOUBLE:
case Types.REAL:
return RecordFieldType.DOUBLE;
@@ -446,7 +491,7 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return RecordFieldType.LONG;
}
if (DATE_CLASS_NAME.equals(valueClassName)) {
- return RecordFieldType.DATE;
+ return getRecordFieldType(RecordFieldType.DATE,
useLogicalTypes);
}
if (FLOAT_CLASS_NAME.equals(valueClassName)) {
return RecordFieldType.FLOAT;
@@ -455,20 +500,28 @@ public class ResultSetRecordSet implements RecordSet,
Closeable {
return RecordFieldType.DOUBLE;
}
if (BIGDECIMAL_CLASS_NAME.equals(valueClassName)) {
- return RecordFieldType.DECIMAL;
+ return getRecordFieldType(RecordFieldType.DECIMAL,
useLogicalTypes);
}
return RecordFieldType.RECORD;
case Types.TIME:
case Types.TIME_WITH_TIMEZONE:
- return RecordFieldType.TIME;
+ return getRecordFieldType(RecordFieldType.TIME,
useLogicalTypes);
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
case -101: // Oracle's TIMESTAMP WITH TIME ZONE
case -102: // Oracle's TIMESTAMP WITH LOCAL TIME ZONE
- return RecordFieldType.TIMESTAMP;
+ return getRecordFieldType(RecordFieldType.TIMESTAMP,
useLogicalTypes);
}
return RecordFieldType.STRING;
}
+
+ private static RecordFieldType getRecordFieldType(final RecordFieldType
recordFieldType, final boolean useLogicalTypes) {
+ if (useLogicalTypes) {
+ return recordFieldType;
+ } else {
+ return RecordFieldType.STRING;
+ }
+ }
}
diff --git
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
index 3cefc4e..10fd23f 100644
---
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
+++
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/ResultSetRecordSetTest.java
@@ -17,6 +17,7 @@
package org.apache.nifi.serialization.record;
import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.junit.Before;
import org.junit.Test;
@@ -27,21 +28,30 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.math.BigDecimal;
+import java.sql.Array;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Time;
+import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -66,26 +76,27 @@ public class ResultSetRecordSetTest {
private static final String COLUMN_NAME_BIG_DECIMAL_4 = "bigDecimal4";
private static final String COLUMN_NAME_BIG_DECIMAL_5 = "bigDecimal5";
- private static final Object[][] COLUMNS = new Object[][] {
- // column number; column label / name / schema field; column type;
schema data type;
- {1, COLUMN_NAME_VARCHAR, Types.VARCHAR,
RecordFieldType.STRING.getDataType()},
- {2, COLUMN_NAME_BIGINT, Types.BIGINT,
RecordFieldType.LONG.getDataType()},
- {3, COLUMN_NAME_ROWID, Types.ROWID,
RecordFieldType.LONG.getDataType()},
- {4, COLUMN_NAME_BIT, Types.BIT,
RecordFieldType.BOOLEAN.getDataType()},
- {5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN,
RecordFieldType.BOOLEAN.getDataType()},
- {6, COLUMN_NAME_CHAR, Types.CHAR,
RecordFieldType.CHAR.getDataType()},
- {7, COLUMN_NAME_DATE, Types.DATE,
RecordFieldType.DATE.getDataType()},
- {8, COLUMN_NAME_INTEGER, Types.INTEGER,
RecordFieldType.INT.getDataType()},
- {9, COLUMN_NAME_DOUBLE, Types.DOUBLE,
RecordFieldType.DOUBLE.getDataType()},
- {10, COLUMN_NAME_REAL, Types.REAL,
RecordFieldType.DOUBLE.getDataType()},
- {11, COLUMN_NAME_FLOAT, Types.FLOAT,
RecordFieldType.FLOAT.getDataType()},
- {12, COLUMN_NAME_SMALLINT, Types.SMALLINT,
RecordFieldType.SHORT.getDataType()},
- {13, COLUMN_NAME_TINYINT, Types.TINYINT,
RecordFieldType.BYTE.getDataType()},
- {14, COLUMN_NAME_BIG_DECIMAL_1,
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)},
- {15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC,
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)},
- {16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT,
RecordFieldType.DECIMAL.getDecimalDataType(501, 1)},
- {17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)},
- {18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(3, 10)},
+ private static final long TIMESTAMP_IN_MILLIS = 1631809132516L;
+
+ private static final TestColumn[] COLUMNS = new TestColumn[] {
+ new TestColumn(1, COLUMN_NAME_VARCHAR, Types.VARCHAR,
RecordFieldType.STRING.getDataType()),
+ new TestColumn(2, COLUMN_NAME_BIGINT, Types.BIGINT,
RecordFieldType.LONG.getDataType()),
+ new TestColumn(3, COLUMN_NAME_ROWID, Types.ROWID,
RecordFieldType.LONG.getDataType()),
+ new TestColumn(4, COLUMN_NAME_BIT, Types.BIT,
RecordFieldType.BOOLEAN.getDataType()),
+ new TestColumn(5, COLUMN_NAME_BOOLEAN, Types.BOOLEAN,
RecordFieldType.BOOLEAN.getDataType()),
+ new TestColumn(6, COLUMN_NAME_CHAR, Types.CHAR,
RecordFieldType.CHAR.getDataType()),
+ new TestColumn(7, COLUMN_NAME_DATE, Types.DATE,
RecordFieldType.DATE.getDataType()),
+ new TestColumn(8, COLUMN_NAME_INTEGER, Types.INTEGER,
RecordFieldType.INT.getDataType()),
+ new TestColumn(9, COLUMN_NAME_DOUBLE, Types.DOUBLE,
RecordFieldType.DOUBLE.getDataType()),
+ new TestColumn(10, COLUMN_NAME_REAL, Types.REAL,
RecordFieldType.DOUBLE.getDataType()),
+ new TestColumn(11, COLUMN_NAME_FLOAT, Types.FLOAT,
RecordFieldType.FLOAT.getDataType()),
+ new TestColumn(12, COLUMN_NAME_SMALLINT, Types.SMALLINT,
RecordFieldType.SHORT.getDataType()),
+ new TestColumn(13, COLUMN_NAME_TINYINT, Types.TINYINT,
RecordFieldType.BYTE.getDataType()),
+ new TestColumn(14, COLUMN_NAME_BIG_DECIMAL_1,
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)),
+ new TestColumn(15, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC,
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
+ new TestColumn(16, COLUMN_NAME_BIG_DECIMAL_3, Types.JAVA_OBJECT,
RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
+ new TestColumn(17, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
+ new TestColumn(18, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(3, 10))
};
@Mock
@@ -96,49 +107,34 @@ public class ResultSetRecordSetTest {
@Before
public void setUp() throws SQLException {
- when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
- when(resultSetMetaData.getColumnCount()).thenReturn(COLUMNS.length);
-
- for (final Object[] column : COLUMNS) {
- when(resultSetMetaData.getColumnLabel((Integer)
column[0])).thenReturn((String) (column[1]));
- when(resultSetMetaData.getColumnName((Integer)
column[0])).thenReturn((String) column[1]);
- when(resultSetMetaData.getColumnType((Integer)
column[0])).thenReturn((Integer) column[2]);
-
- if(column[3] instanceof DecimalDataType) {
- DecimalDataType ddt = (DecimalDataType)column[3];
- when(resultSetMetaData.getPrecision((Integer)
column[0])).thenReturn(ddt.getPrecision());
- when(resultSetMetaData.getScale((Integer)
column[0])).thenReturn(ddt.getScale());
- }
- }
-
- // Big decimal values are necessary in order to determine precision
and scale
- when(resultSet.getBigDecimal(16)).thenReturn(new
BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
-
- // This will be handled by a dedicated branch for Java Objects, needs
some further details
-
when(resultSetMetaData.getColumnClassName(16)).thenReturn(BigDecimal.class.getName());
+ setUpMocks(COLUMNS, resultSetMetaData, resultSet);
}
@Test
public void testCreateSchema() throws SQLException {
// given
- final RecordSchema recordSchema = givenRecordSchema();
+ final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
+ final RecordSchema expectedSchema = givenRecordSchema(COLUMNS);
// when
final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, recordSchema);
- final RecordSchema resultSchema = testSubject.getSchema();
+ final RecordSchema actualSchema = testSubject.getSchema();
// then
- thenAllColumnDataTypesAreCorrect(resultSchema);
+ thenAllColumnDataTypesAreCorrect(COLUMNS, expectedSchema,
actualSchema);
}
@Test
public void testCreateSchemaWhenNoRecordSchema() throws SQLException {
+ // given
+ final RecordSchema expectedSchema = givenRecordSchema(COLUMNS);
+
// when
final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, null);
- final RecordSchema resultSchema = testSubject.getSchema();
+ final RecordSchema actualSchema = testSubject.getSchema();
// then
- thenAllColumnDataTypesAreCorrect(resultSchema);
+ thenAllColumnDataTypesAreCorrect(COLUMNS, expectedSchema,
actualSchema);
}
@Test
@@ -147,7 +143,7 @@ public class ResultSetRecordSetTest {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("column",
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
- final ResultSet resultSet = givenResultSetForOther();
+ final ResultSet resultSet = givenResultSetForOther(fields);
// when
final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, recordSchema);
@@ -158,9 +154,86 @@ public class ResultSetRecordSetTest {
}
@Test
+ public void testCreateSchemaWhenOtherTypeUsingLogicalTypes() throws
SQLException {
+ // given
+ final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+ final ResultSet resultSet = givenResultSetForOther(fields);
+
+ // when
+ final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, recordSchema, 10, 0, true);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesMatchInputFieldType(fields, resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherTypeAndNoLogicalTypes() throws
SQLException {
+ // given
+ final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+ final ResultSet resultSet = givenResultSetForOther(fields);
+
+ // when
+ final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, recordSchema, 10, 0, false);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesAreString(resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherTypeUsingLogicalTypesNoSchema()
throws SQLException {
+ // given
+ final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+ final ResultSet resultSet = givenResultSetForOther(fields);
+
+ // when
+ final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, null, 10, 0, true);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesAreChoice(fields, resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherTypeAndNoLogicalTypesNoSchema()
throws SQLException {
+ // given
+ final List<RecordField> fields = givenFieldsThatRequireLogicalTypes();
+ final ResultSet resultSet = givenResultSetForOther(fields);
+
+ // when
+ final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, null, 10, 0, false);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesAreString(resultSchema);
+ }
+
+ @Test
+ public void testCreateSchemaWhenOtherTypeUsingLogicalTypesWithRecord()
throws SQLException {
+ // given
+ final Record inputRecord = givenInputRecord(); // The field's type is
going to be RECORD (there is a record within a record)
+ final List<RecordField> fields =
givenFieldsThatAreOfTypeRecord(Arrays.asList(inputRecord));
+ final ResultSet resultSet = givenResultSetForOther(fields);
+
+ when(resultSet.getObject(1)).thenReturn(inputRecord);
+
+ // when
+ final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, null, 10, 0, true);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesMatchInputFieldType(fields, resultSchema);
+ }
+
+ @Test
public void testCreateSchemaWhenOtherTypeWithoutSchema() throws
SQLException {
// given
- final ResultSet resultSet = givenResultSetForOther();
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("column",
RecordFieldType.STRING.getDataType()));
+ final ResultSet resultSet = givenResultSetForOther(fields);
// when
final ResultSetRecordSet testSubject = new
ResultSetRecordSet(resultSet, null);
@@ -173,7 +246,7 @@ public class ResultSetRecordSetTest {
@Test
public void testCreateRecord() throws SQLException {
// given
- final RecordSchema recordSchema = givenRecordSchema();
+ final RecordSchema recordSchema = givenRecordSchema(COLUMNS);
LocalDate testDate = LocalDate.of(2021, 1, 26);
@@ -257,6 +330,28 @@ public class ResultSetRecordSetTest {
}
@Test
+ public void
testCreateSchemaThrowsExceptionSchemaCreationStillCalledConsideringLogicalTypeFlag()
throws SQLException {
+ // given
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("column",
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final ResultSet resultSet = Mockito.mock(ResultSet.class);
+ final ResultSetMetaData resultSetMetaData =
Mockito.mock(ResultSetMetaData.class);
+ when(resultSet.getMetaData()).thenThrow(new SQLException("test
exception")).thenReturn(resultSetMetaData);
+ when(resultSetMetaData.getColumnCount()).thenReturn(1);
+ when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
+ when(resultSetMetaData.getColumnType(1)).thenReturn(Types.DECIMAL);
+
+ // when
+ ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet,
recordSchema, 10,0, false);
+ final RecordSchema resultSchema = testSubject.getSchema();
+
+ // then
+ thenAllDataTypesAreString(resultSchema);
+ }
+
+ @Test
public void testCreateSchemaArrayThrowsNotSupportedException() throws
SQLException {
// given
final List<RecordField> fields = new ArrayList<>();
@@ -272,6 +367,175 @@ public class ResultSetRecordSetTest {
assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()),
resultSchema.getField(0).getDataType());
}
+ @Test
+ public void testArrayTypeWithLogicalTypes() throws SQLException {
+ testArrayType(true);
+ }
+
+ @Test
+ public void testArrayTypeNoLogicalTypes() throws SQLException {
+ testArrayType(false);
+ }
+
+ @Test
+ public void testCreateSchemaWithLogicalTypes() throws SQLException {
+ testCreateSchemaLogicalTypes(true, true);
+ }
+
+ @Test
+ public void testCreateSchemaNoLogicalTypes() throws SQLException {
+ testCreateSchemaLogicalTypes(false, true);
+ }
+
+ @Test
+ public void testCreateSchemaWithLogicalTypesNoInputSchema() throws
SQLException {
+ testCreateSchemaLogicalTypes(true, false);
+ }
+
+ @Test
+ public void testCreateSchemaNoLogicalTypesNoInputSchema() throws
SQLException {
+ testCreateSchemaLogicalTypes(false, false);
+ }
+
+ private void testArrayType(boolean useLogicalTypes) throws SQLException {
+ // GIVEN
+ List<ArrayTestData> testData =
givenArrayTypesThatRequireLogicalTypes();
+ Map<String, DataType> expectedTypes =
givenExpectedTypesForArrayTypesThatRequireLogicalTypes(useLogicalTypes);
+
+ // WHEN
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+ ResultSetMetaData resultSetMetaData =
Mockito.mock(ResultSetMetaData.class);
+ when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ when(resultSetMetaData.getColumnCount()).thenReturn(testData.size());
+
+ List<RecordField> fields =
whenSchemaFieldsAreSetupForArrayType(testData, resultSet, resultSetMetaData);
+ RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet,
recordSchema, 10,0, useLogicalTypes);
+ RecordSchema actualSchema = testSubject.getSchema();
+
+ // THEN
+ thenActualArrayElementTypesMatchExpected(expectedTypes, actualSchema);
+ }
+
+ private void testCreateSchemaLogicalTypes(boolean useLogicalTypes, boolean
provideInputSchema) throws SQLException {
+ // GIVEN
+ TestColumn[] columns = new TestColumn[]{
+ new TestColumn(1, COLUMN_NAME_DATE, Types.DATE,
RecordFieldType.DATE.getDataType()),
+ new TestColumn(2, "time", Types.TIME,
RecordFieldType.TIME.getDataType()),
+ new TestColumn(3, "time_with_timezone",
Types.TIME_WITH_TIMEZONE, RecordFieldType.TIME.getDataType()),
+ new TestColumn(4, "timestamp", Types.TIMESTAMP,
RecordFieldType.TIMESTAMP.getDataType()),
+ new TestColumn(5, "timestamp_with_timezone",
Types.TIMESTAMP_WITH_TIMEZONE, RecordFieldType.TIMESTAMP.getDataType()),
+ new TestColumn(6, COLUMN_NAME_BIG_DECIMAL_1,
Types.DECIMAL,RecordFieldType.DECIMAL.getDecimalDataType(7, 3)),
+ new TestColumn(7, COLUMN_NAME_BIG_DECIMAL_2, Types.NUMERIC,
RecordFieldType.DECIMAL.getDecimalDataType(4, 0)),
+ new TestColumn(8, COLUMN_NAME_BIG_DECIMAL_3,
Types.JAVA_OBJECT, RecordFieldType.DECIMAL.getDecimalDataType(501, 1)),
+ new TestColumn(9, COLUMN_NAME_BIG_DECIMAL_4, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(10, 3)),
+ new TestColumn(10, COLUMN_NAME_BIG_DECIMAL_5, Types.DECIMAL,
RecordFieldType.DECIMAL.getDecimalDataType(3, 10)),
+ };
+ final RecordSchema recordSchema = provideInputSchema ?
givenRecordSchema(columns) : null;
+
+ ResultSetMetaData resultSetMetaData =
Mockito.mock(ResultSetMetaData.class);
+ ResultSet resultSet = Mockito.mock(ResultSet.class);
+
+ RecordSchema expectedSchema = useLogicalTypes ?
givenRecordSchema(columns) : givenRecordSchemaWithOnlyStringType(columns);
+
+ // WHEN
+ setUpMocks(columns, resultSetMetaData, resultSet);
+
+ ResultSetRecordSet testSubject = new ResultSetRecordSet(resultSet,
recordSchema, 10,0, useLogicalTypes);
+ RecordSchema actualSchema = testSubject.getSchema();
+
+ // THEN
+ thenAllColumnDataTypesAreCorrect(columns, expectedSchema,
actualSchema);
+ }
+
+ private void setUpMocks(TestColumn[] columns, ResultSetMetaData
resultSetMetaData, ResultSet resultSet) throws SQLException {
+ when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
+ when(resultSetMetaData.getColumnCount()).thenReturn(columns.length);
+
+ int indexOfBigDecimal = -1;
+ int index = 0;
+ for (final TestColumn column : columns) {
+
when(resultSetMetaData.getColumnLabel(column.getIndex())).thenReturn(column.getColumnName());
+
when(resultSetMetaData.getColumnName(column.getIndex())).thenReturn(column.getColumnName());
+
when(resultSetMetaData.getColumnType(column.getIndex())).thenReturn(column.getSqlType());
+
+ if (column.getRecordFieldType() instanceof DecimalDataType) {
+ DecimalDataType ddt = (DecimalDataType)
column.getRecordFieldType();
+
when(resultSetMetaData.getPrecision(column.getIndex())).thenReturn(ddt.getPrecision());
+
when(resultSetMetaData.getScale(column.getIndex())).thenReturn(ddt.getScale());
+ }
+ if (column.getSqlType() == Types.JAVA_OBJECT) {
+ indexOfBigDecimal = index + 1;
+ }
+ ++index;
+ }
+
+ // Big decimal values are necessary in order to determine precision
and scale
+ when(resultSet.getBigDecimal(indexOfBigDecimal)).thenReturn(new
BigDecimal(String.join("", Collections.nCopies(500, "1")) + ".1"));
+
+ // This will be handled by a dedicated branch for Java Objects, needs
some further details
+
when(resultSetMetaData.getColumnClassName(indexOfBigDecimal)).thenReturn(BigDecimal.class.getName());
+ }
+
+ private List<RecordField> givenFieldsThatRequireLogicalTypes() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("decimal",
RecordFieldType.DECIMAL.getDecimalDataType(30, 10)));
+ fields.add(new RecordField("date",
RecordFieldType.DATE.getDataType()));
+ fields.add(new RecordField("time",
RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("timestamp",
RecordFieldType.TIMESTAMP.getDataType()));
+ return fields;
+ }
+
+ private RecordSchema givenRecordSchema(TestColumn[] columns) {
+ final List<RecordField> fields = new ArrayList<>(columns.length);
+
+ for (TestColumn column : columns) {
+ fields.add(new RecordField(column.getColumnName(),
column.getRecordFieldType()));
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private RecordSchema givenRecordSchemaWithOnlyStringType(TestColumn[]
columns) {
+ final List<RecordField> fields = new ArrayList<>(columns.length);
+
+ for (TestColumn column : columns) {
+ fields.add(new RecordField(column.getColumnName(),
RecordFieldType.STRING.getDataType()));
+ }
+
+ return new SimpleRecordSchema(fields);
+ }
+
+ private List<ArrayTestData> givenArrayTypesThatRequireLogicalTypes() {
+ List<ArrayTestData> testData = new ArrayList<>();
+ testData.add(new ArrayTestData("arrayBigDecimal",
+ new ResultBigDecimal[]{new ResultBigDecimal(), new
ResultBigDecimal()}));
+ testData.add(new ArrayTestData("arrayDate",
+ new Date[]{new Date(TIMESTAMP_IN_MILLIS), new
Date(TIMESTAMP_IN_MILLIS)}));
+ testData.add(new ArrayTestData("arrayTime",
+ new Time[]{new Time(TIMESTAMP_IN_MILLIS), new
Time(TIMESTAMP_IN_MILLIS)}));
+ testData.add(new ArrayTestData("arrayTimestamp",
+ new Timestamp[]{new Timestamp(TIMESTAMP_IN_MILLIS), new
Timestamp(TIMESTAMP_IN_MILLIS)}));
+ return testData;
+ }
+
+ private Map<String, DataType>
givenExpectedTypesForArrayTypesThatRequireLogicalTypes(final boolean
useLogicalTypes) {
+ Map<String, DataType> expectedTypes = new HashMap<>();
+ if (useLogicalTypes) {
+ expectedTypes.put("arrayBigDecimal",
RecordFieldType.DECIMAL.getDecimalDataType(ResultBigDecimal.PRECISION,
ResultBigDecimal.SCALE));
+ expectedTypes.put("arrayDate", RecordFieldType.DATE.getDataType());
+ expectedTypes.put("arrayTime", RecordFieldType.TIME.getDataType());
+ expectedTypes.put("arrayTimestamp",
RecordFieldType.TIMESTAMP.getDataType());
+ } else {
+ expectedTypes.put("arrayBigDecimal",
RecordFieldType.STRING.getDataType());
+ expectedTypes.put("arrayDate",
RecordFieldType.STRING.getDataType());
+ expectedTypes.put("arrayTime",
RecordFieldType.STRING.getDataType());
+ expectedTypes.put("arrayTimestamp",
RecordFieldType.STRING.getDataType());
+ }
+ return expectedTypes;
+ }
+
private ResultSet givenResultSetForArrayThrowsException(boolean
featureSupported) throws SQLException {
final ResultSet resultSet = Mockito.mock(ResultSet.class);
final ResultSetMetaData resultSetMetaData =
Mockito.mock(ResultSetMetaData.class);
@@ -283,41 +547,234 @@ public class ResultSetRecordSetTest {
return resultSet;
}
- private ResultSet givenResultSetForOther() throws SQLException {
+ private ResultSet givenResultSetForOther(List<RecordField> fields) throws
SQLException {
final ResultSet resultSet = Mockito.mock(ResultSet.class);
final ResultSetMetaData resultSetMetaData =
Mockito.mock(ResultSetMetaData.class);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
- when(resultSetMetaData.getColumnCount()).thenReturn(1);
- when(resultSetMetaData.getColumnLabel(1)).thenReturn("column");
- when(resultSetMetaData.getColumnName(1)).thenReturn("column");
- when(resultSetMetaData.getColumnType(1)).thenReturn(Types.OTHER);
+ when(resultSetMetaData.getColumnCount()).thenReturn(fields.size());
+ for (int i = 0; i < fields.size(); ++i) {
+ int columnIndex = i + 1;
+
when(resultSetMetaData.getColumnLabel(columnIndex)).thenReturn(fields.get(i).getFieldName());
+
when(resultSetMetaData.getColumnName(columnIndex)).thenReturn(fields.get(i).getFieldName());
+
when(resultSetMetaData.getColumnType(columnIndex)).thenReturn(Types.OTHER);
+ }
return resultSet;
}
- private RecordSchema givenRecordSchema() {
- final List<RecordField> fields = new ArrayList<>();
+ private Record givenInputRecord() {
+ List<RecordField> inputRecordFields = new ArrayList<>(2);
+ inputRecordFields.add(new RecordField("id",
RecordFieldType.INT.getDataType()));
+ inputRecordFields.add(new RecordField("name",
RecordFieldType.STRING.getDataType()));
+ RecordSchema inputRecordSchema = new
SimpleRecordSchema(inputRecordFields);
+
+ Map<String, Object> inputRecordData = new HashMap<>(2);
+ inputRecordData.put("id", 1);
+ inputRecordData.put("name", "John");
+
+ return new MapRecord(inputRecordSchema, inputRecordData);
+ }
+
+ private List<RecordField> givenFieldsThatAreOfTypeRecord(List<Record>
concreteRecords) {
+ List<RecordField> fields = new ArrayList<>(concreteRecords.size());
+ int i = 1;
+ for (Record record : concreteRecords) {
+ fields.add(new RecordField("record" + String.valueOf(i),
RecordFieldType.RECORD.getRecordDataType(record.getSchema())));
+ ++i;
+ }
+ return fields;
+ }
- for (final Object[] column : COLUMNS) {
- fields.add(new RecordField((String) column[1], (DataType)
column[3]));
+ private List<RecordField> whenSchemaFieldsAreSetupForArrayType(final
List<ArrayTestData> testData,
+ final
ResultSet resultSet,
+ final
ResultSetMetaData resultSetMetaData)
+ throws SQLException {
+ List<RecordField> fields = new ArrayList<>();
+ for (int i = 0; i < testData.size(); ++i) {
+ ArrayTestData testDatum = testData.get(i);
+ int columnIndex = i + 1;
+ ResultSqlArray arrayDummy = Mockito.mock(ResultSqlArray.class);
+ when(arrayDummy.getArray()).thenReturn(testDatum.getTestArray());
+ when(resultSet.getArray(columnIndex)).thenReturn(arrayDummy);
+
when(resultSetMetaData.getColumnLabel(columnIndex)).thenReturn(testDatum.getFieldName());
+
when(resultSetMetaData.getColumnType(columnIndex)).thenReturn(Types.ARRAY);
+ fields.add(new RecordField(testDatum.getFieldName(),
RecordFieldType.ARRAY.getDataType()));
}
+ return fields;
+ }
- return new SimpleRecordSchema(fields);
+ private void thenAllDataTypesMatchInputFieldType(final List<RecordField>
inputFields, final RecordSchema resultSchema) {
+ assertEquals("The number of input fields does not match the number of
fields in the result schema.", inputFields.size(),
resultSchema.getFieldCount());
+ for (int i = 0; i < inputFields.size(); ++i) {
+ assertEquals(inputFields.get(i).getDataType(),
resultSchema.getField(i).getDataType());
+ }
}
- private void thenAllColumnDataTypesAreCorrect(final RecordSchema
resultSchema) {
- assertNotNull(resultSchema);
+ private void thenAllDataTypesAreString(final RecordSchema resultSchema) {
+ for (int i = 0; i < resultSchema.getFieldCount(); ++i) {
+ assertEquals(RecordFieldType.STRING.getDataType(),
resultSchema.getField(i).getDataType());
+ }
+ }
- for (final Object[] column : COLUMNS) {
+ private void thenAllColumnDataTypesAreCorrect(TestColumn[] columns,
RecordSchema expectedSchema, RecordSchema actualSchema) {
+ assertNotNull(actualSchema);
+
+ for (TestColumn column : columns) {
+ int fieldIndex = column.getIndex() - 1;
// The DECIMAL column with scale larger than precision will not
match so verify that instead
- DataType actualDataType = resultSchema.getField((Integer)
column[0] - 1).getDataType();
- DataType expectedDataType = (DataType) column[3];
+ DataType actualDataType =
actualSchema.getField(fieldIndex).getDataType();
+ DataType expectedDataType =
expectedSchema.getField(fieldIndex).getDataType();
if
(expectedDataType.equals(RecordFieldType.DECIMAL.getDecimalDataType(3, 10))) {
DecimalDataType decimalDataType = (DecimalDataType)
expectedDataType;
if (decimalDataType.getScale() >
decimalDataType.getPrecision()) {
expectedDataType =
RecordFieldType.DECIMAL.getDecimalDataType(decimalDataType.getScale(),
decimalDataType.getScale());
}
}
- assertEquals("For column " + column[0] + " the converted type is
not matching", expectedDataType, actualDataType);
+ assertEquals("For column " + column.getIndex() + " the converted
type is not matching", expectedDataType, actualDataType);
+ }
+ }
+
+ private void thenActualArrayElementTypesMatchExpected(Map<String,
DataType> expectedTypes, RecordSchema actualSchema) {
+ for (RecordField recordField : actualSchema.getFields()) {
+ if (recordField.getDataType() instanceof ArrayDataType) {
+ ArrayDataType arrayType = (ArrayDataType)
recordField.getDataType();
+ assertEquals("Array element type for " +
recordField.getFieldName()
+ + " is not of expected type " +
expectedTypes.get(recordField.getFieldName()).toString(),
+ expectedTypes.get(recordField.getFieldName()),
arrayType.getElementType());
+ } else {
+ fail("RecordField " + recordField.getFieldName() + " is not
instance of ArrayDataType");
+ }
+ }
+ }
+
+ private void thenAllDataTypesAreChoice(final List<RecordField>
inputFields, final RecordSchema resultSchema) {
+ assertEquals("The number of input fields does not match the number of
fields in the result schema.", inputFields.size(),
resultSchema.getFieldCount());
+
+ DataType expectedType = getBroadestChoiceDataType();
+ for (int i = 0; i < inputFields.size(); ++i) {
+ assertEquals(expectedType, resultSchema.getField(i).getDataType());
+ }
+ }
+
+ private DataType getBroadestChoiceDataType() {
+ List<DataType> dataTypes = Stream.of(RecordFieldType.BIGINT,
RecordFieldType.BOOLEAN, RecordFieldType.BYTE, RecordFieldType.CHAR,
RecordFieldType.DATE,
+ RecordFieldType.DECIMAL, RecordFieldType.DOUBLE,
RecordFieldType.FLOAT, RecordFieldType.INT, RecordFieldType.LONG,
RecordFieldType.SHORT, RecordFieldType.STRING,
+ RecordFieldType.TIME, RecordFieldType.TIMESTAMP)
+ .map(RecordFieldType::getDataType)
+ .collect(Collectors.toList());
+ return RecordFieldType.CHOICE.getChoiceDataType(dataTypes);
+ }
+
+ private static class TestColumn {
+ private final int index; // Column indexing starts from 1, not 0.
+ private final String columnName;
+ private final int sqlType;
+ private final DataType recordFieldType;
+
+ public TestColumn(final int index, final String columnName, final int
sqlType, final DataType recordFieldType) {
+ this.index = index;
+ this.columnName = columnName;
+ this.sqlType = sqlType;
+ this.recordFieldType = recordFieldType;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public int getSqlType() {
+ return sqlType;
+ }
+
+ public DataType getRecordFieldType() {
+ return recordFieldType;
+ }
+ }
+
+ private static class ResultSqlArray implements Array {
+
+ @Override
+ public String getBaseTypeName() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public int getBaseType() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public Object getArray() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getArray(Map<String, Class<?>> map) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getArray(long index, int count) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public Object getArray(long index, int count, Map<String, Class<?>>
map) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public ResultSet getResultSet(Map<String, Class<?>> map) throws
SQLException {
+ return null;
+ }
+
+ @Override
+ public ResultSet getResultSet(long index, int count) throws
SQLException {
+ return null;
+ }
+
+ @Override
+ public ResultSet getResultSet(long index, int count, Map<String,
Class<?>> map) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void free() throws SQLException {
+
+ }
+ }
+
+ private static class ResultBigDecimal extends BigDecimal {
+ public static int PRECISION = 3;
+ public static int SCALE = 0;
+ public ResultBigDecimal() {
+ super("123");
+ }
+ }
+
+ private static class ArrayTestData {
+ final private String fieldName;
+ final private Object[] testArray;
+
+ public ArrayTestData(String fieldName, Object[] testArray) {
+ this.fieldName = fieldName;
+ this.testArray = testArray;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public Object[] getTestArray() {
+ return testArray;
}
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
index 9862383..30e6c51 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java
@@ -189,6 +189,10 @@ public class JdbcCommon {
return defaultScale;
}
+ public boolean isUseLogicalTypes() {
+ return useLogicalTypes;
+ }
+
public static class Builder {
private String recordName;
private int maxRows = 0;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
index f40acb2..18f7f12 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -70,7 +70,7 @@ public class RecordSqlWriter implements SqlWriter {
if (fullRecordSet == null) {
final Schema avroSchema = JdbcCommon.createSchema(resultSet,
options);
final RecordSchema recordAvroSchema =
AvroTypeUtil.createSchema(avroSchema);
- fullRecordSet = new ResultSetRecordSetWithCallback(resultSet,
recordAvroSchema, callback, options.getDefaultPrecision(),
options.getDefaultScale());
+ fullRecordSet = new ResultSetRecordSetWithCallback(resultSet,
recordAvroSchema, callback, options.getDefaultPrecision(),
options.getDefaultScale(), options.isUseLogicalTypes());
writeSchema =
recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
}
recordSet = (maxRowsPerFlowFile > 0) ?
fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
@@ -135,8 +135,8 @@ public class RecordSqlWriter implements SqlWriter {
private final ResultSetRowCallback callback;
ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema
readerSchema, ResultSetRowCallback callback,
- final int defaultPrecision, final int
defaultScale) throws SQLException {
- super(rs, readerSchema, defaultPrecision, defaultScale);
+ final int defaultPrecision, final int
defaultScale, final boolean useLogicalTypes) throws SQLException {
+ super(rs, readerSchema, defaultPrecision, defaultScale,
useLogicalTypes);
this.callback = callback;
}