mark-bathori commented on code in PR #7239:
URL: https://github.com/apache/nifi/pull/7239#discussion_r1200441738


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())

Review Comment:
   The `isEmpty` seems to be unnecessary here since  the `getDateTimeFormatter` 
checks null on the `format` parameter.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);

Review Comment:
   I think you can call directly the type conversion functions eg.: 
`DataTypeUtils.toBoolean`, `DataTypeUtils.toInteger` etc.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())
+                            ? null
+                            : 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.UUID.getDataType(), null);
+                case STRING:
+                default:
+                    return DataTypeUtils.convertRecordFieldtoObject(data, 
RecordFieldType.STRING.getDataType());
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            if (data instanceof Time) {
+                return ((Time) data).toLocalTime();
+            }
+            return DataTypeUtils.toTime(data, () -> 
StringUtils.isEmpty(timeFormat)

Review Comment:
   Same here, `getDateFormat` checks null on the format parameter.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())
+                            ? null
+                            : 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.UUID.getDataType(), null);
+                case STRING:
+                default:
+                    return DataTypeUtils.convertRecordFieldtoObject(data, 
RecordFieldType.STRING.getDataType());
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            if (data instanceof Time) {
+                return ((Time) data).toLocalTime();
+            }
+            return DataTypeUtils.toTime(data, () -> 
StringUtils.isEmpty(timeFormat)
+                    ? null
+                    : DataTypeUtils.getDateFormat(timeFormat), 
null).toLocalTime();
         }
     }
 
-    static class TimestampConverter extends DataConverter<Timestamp, 
LocalDateTime> {
+    static class TimestampConverter extends DataConverter<Object, 
LocalDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public LocalDateTime convert(Timestamp data) {
-            return data.toLocalDateTime();
+        public LocalDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = (Timestamp) 
DataTypeUtils.convertType(data, 
RecordFieldType.TIMESTAMP.getDataType(dataType.getFormat()),

Review Comment:
   You can call `DataTypeUtils.toTimestamp` here so you can eliminate a lot of 
unnecessary `null` parameter.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())
+                            ? null
+                            : 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.UUID.getDataType(), null);
+                case STRING:
+                default:
+                    return DataTypeUtils.convertRecordFieldtoObject(data, 
RecordFieldType.STRING.getDataType());
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            if (data instanceof Time) {
+                return ((Time) data).toLocalTime();
+            }
+            return DataTypeUtils.toTime(data, () -> 
StringUtils.isEmpty(timeFormat)
+                    ? null
+                    : DataTypeUtils.getDateFormat(timeFormat), 
null).toLocalTime();
         }
     }
 
-    static class TimestampConverter extends DataConverter<Timestamp, 
LocalDateTime> {
+    static class TimestampConverter extends DataConverter<Object, 
LocalDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public LocalDateTime convert(Timestamp data) {
-            return data.toLocalDateTime();
+        public LocalDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = (Timestamp) 
DataTypeUtils.convertType(data, 
RecordFieldType.TIMESTAMP.getDataType(dataType.getFormat()),
+                    null, null, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+            return convertedTimestamp.toLocalDateTime();
         }
     }
 
-    static class TimestampWithTimezoneConverter extends 
DataConverter<Timestamp, OffsetDateTime> {
+    static class TimestampWithTimezoneConverter extends DataConverter<Object, 
OffsetDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampWithTimezoneConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public OffsetDateTime convert(Timestamp data) {
-            return OffsetDateTime.ofInstant(data.toInstant(), 
ZoneId.of("UTC"));
+        public OffsetDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = (Timestamp) 
DataTypeUtils.convertType(data, 
RecordFieldType.TIMESTAMP.getDataType(dataType.getFormat()),

Review Comment:
   Same here, you can call `DataTypeUtils.toTimestamp` directly.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())
+                            ? null
+                            : 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.UUID.getDataType(), null);
+                case STRING:
+                default:
+                    return DataTypeUtils.convertRecordFieldtoObject(data, 
RecordFieldType.STRING.getDataType());
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            if (data instanceof Time) {

Review Comment:
   This type check is unnecessary because `DataTypeUtils.toTime ` contains the 
same logic and `toLocalTime` is also  called in the below line.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -85,21 +93,25 @@ private static class IcebergSchemaVisitor extends 
SchemaWithPartnerVisitor<DataT
                     case DOUBLE:
                     case DATE:
                     case STRING:
-                        return new GenericDataConverters.SameTypeConverter();
+                        return new 
GenericDataConverters.PrimitiveTypeConverter(type, dataType);
                     case TIME:
-                        return new GenericDataConverters.TimeConverter();
+                        return new 
GenericDataConverters.TimeConverter(dataType.getFormat());
                     case TIMESTAMP:
                         final Types.TimestampType timestampType = 
(Types.TimestampType) type;
                         if (timestampType.shouldAdjustToUTC()) {
-                            return new 
GenericDataConverters.TimestampWithTimezoneConverter();
+                            return new 
GenericDataConverters.TimestampWithTimezoneConverter(dataType);
                         }
-                        return new GenericDataConverters.TimestampConverter();
+                        return new 
GenericDataConverters.TimestampConverter(dataType);
                     case UUID:
-                        final UUIDDataType uuidType = (UUIDDataType) dataType;
-                        if (uuidType.getFileFormat() == FileFormat.PARQUET) {
+                        if (dataType instanceof UUIDDataType) {
+                            final UUIDDataType uuidType = (UUIDDataType) 
dataType;
+                            if (uuidType.getFileFormat() == 
FileFormat.PARQUET) {
+                                return new 
GenericDataConverters.UUIDtoByteArrayConverter();
+                            }
+                        } else if (type instanceof Types.UUIDType && 
fileFormat == FileFormat.PARQUET) {

Review Comment:
   This seems to duplicate the above logic and passing down the fileFormat down 
here is also unnecessary.
   The `IcebergPartnerAccessors` should be extended to create a `UUIDDataType` 
when the target field's type is UUID so this part can be reverted.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -47,45 +51,106 @@
  */
 public class GenericDataConverters {
 
-    static class SameTypeConverter extends DataConverter<Object, Object> {
+    static class PrimitiveTypeConverter extends DataConverter<Object, Object> {
+        final Type.PrimitiveType targetType;
+        final DataType sourceType;
+
+        public PrimitiveTypeConverter(final Type.PrimitiveType type, final 
DataType dataType) {
+            targetType = type;
+            sourceType = dataType;
+        }
 
         @Override
         public Object convert(Object data) {
-            return data;
+            switch (targetType.typeId()) {
+                case BOOLEAN:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.BOOLEAN.getDataType(), null);
+                case INTEGER:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.INT.getDataType(), null);
+                case LONG:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.LONG.getDataType(), null);
+                case FLOAT:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.FLOAT.getDataType(), null);
+                case DOUBLE:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.DOUBLE.getDataType(), null);
+                case DATE:
+                    return DataTypeUtils.toLocalDate(data, () -> 
StringUtils.isEmpty(sourceType.getFormat())
+                            ? null
+                            : 
DataTypeUtils.getDateTimeFormatter(sourceType.getFormat(), 
ZoneId.systemDefault()), null);
+                case UUID:
+                    return DataTypeUtils.convertType(data, 
RecordFieldType.UUID.getDataType(), null);
+                case STRING:
+                default:
+                    return DataTypeUtils.convertRecordFieldtoObject(data, 
RecordFieldType.STRING.getDataType());
+            }
         }
     }
 
-    static class TimeConverter extends DataConverter<Time, LocalTime> {
+    static class TimeConverter extends DataConverter<Object, LocalTime> {
+
+        private final String timeFormat;
+
+        public TimeConverter(final String format) {
+            this.timeFormat = format;
+        }
 
         @Override
-        public LocalTime convert(Time data) {
-            return data.toLocalTime();
+        public LocalTime convert(Object data) {
+            if (data instanceof Time) {
+                return ((Time) data).toLocalTime();
+            }
+            return DataTypeUtils.toTime(data, () -> 
StringUtils.isEmpty(timeFormat)
+                    ? null
+                    : DataTypeUtils.getDateFormat(timeFormat), 
null).toLocalTime();
         }
     }
 
-    static class TimestampConverter extends DataConverter<Timestamp, 
LocalDateTime> {
+    static class TimestampConverter extends DataConverter<Object, 
LocalDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public LocalDateTime convert(Timestamp data) {
-            return data.toLocalDateTime();
+        public LocalDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = (Timestamp) 
DataTypeUtils.convertType(data, 
RecordFieldType.TIMESTAMP.getDataType(dataType.getFormat()),
+                    null, null, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+            return convertedTimestamp.toLocalDateTime();
         }
     }
 
-    static class TimestampWithTimezoneConverter extends 
DataConverter<Timestamp, OffsetDateTime> {
+    static class TimestampWithTimezoneConverter extends DataConverter<Object, 
OffsetDateTime> {
+
+        private final DataType dataType;
+
+        public TimestampWithTimezoneConverter(final DataType dataType) {
+            this.dataType = dataType;
+        }
 
         @Override
-        public OffsetDateTime convert(Timestamp data) {
-            return OffsetDateTime.ofInstant(data.toInstant(), 
ZoneId.of("UTC"));
+        public OffsetDateTime convert(Object data) {
+            final Timestamp convertedTimestamp = (Timestamp) 
DataTypeUtils.convertType(data, 
RecordFieldType.TIMESTAMP.getDataType(dataType.getFormat()),
+                    null, null, () -> 
DataTypeUtils.getDateFormat(dataType.getFormat()), null);
+            return OffsetDateTime.ofInstant(convertedTimestamp.toInstant(), 
ZoneId.of("UTC"));
         }
     }
 
-    static class UUIDtoByteArrayConverter extends DataConverter<UUID, byte[]> {
+    static class UUIDtoByteArrayConverter extends DataConverter<Object, 
byte[]> {
 
         @Override
-        public byte[] convert(UUID data) {
+        public byte[] convert(Object data) {
+            final UUID uuid;

Review Comment:
   The` DataTypeUtils.toUUID` method contains these UUID related logic so you 
can replace this part with that method call.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -221,6 +239,26 @@ private static RecordSchema getPrimitivesSchema() {
         return new SimpleRecordSchema(fields);
     }
 
+    private static RecordSchema getPrimitivesAsCompatiblesSchema() {
+        List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("string", 
RecordFieldType.STRING.getDataType()));

Review Comment:
   I think this field's type should be changed to something else to cover the 
String conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to