mark-bathori commented on code in PR #7421:
URL: https://github.com/apache/nifi/pull/7421#discussion_r1259330971


##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -130,6 +132,9 @@ static class UUIDtoByteArrayConverter extends 
DataConverter<Object, byte[]> {
         @Override
         public byte[] convert(Object data) {
             final UUID uuid = DataTypeUtils.toUUID(data);
+            if (uuid == null) {

Review Comment:
   This part is unreachable since the `DataTypeUtils.toUUID` will throw a 
`IllegalTypeConversionException` if you pass a null value to it.



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/IcebergRecordConverter.java:
##########
@@ -123,8 +135,12 @@ private static class IcebergSchemaVisitor extends 
SchemaWithPartnerVisitor<DataT
             // set NiFi schema field names (sourceFieldName) in the data 
converters
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<String> mappedFieldName = 
recordType.getNameMapping(converter.getTargetFieldName());
-                final Optional<RecordField> recordField = 
recordSchema.getField(mappedFieldName.get());
-                converter.setSourceFieldName(recordField.get().getFieldName());
+                if (mappedFieldName.isEmpty()) {
+                    
converter.setSourceFieldName(converter.getTargetFieldName());
+                } else {
+                    final Optional<RecordField> recordField = 
recordSchema.getField(mappedFieldName.get());
+                    
converter.setSourceFieldName(recordField.get().getFieldName());
+                }

Review Comment:
   I think we shouldn't set the `target field name` to the` source field name` 
because it is confusing. The `source name` represents the `NiFi record field 
name` and the `target name `represents the `iceberg record field name`. We 
should leave it null and set only the target.
   ```suggestion
                   if (mappedFieldName.isPresent()) {
                       final Optional<RecordField> recordField = 
recordSchema.getField(mappedFieldName.get());
                       
converter.setSourceFieldName(recordField.get().getFieldName());
                   }
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/converter/GenericDataConverters.java:
##########
@@ -245,9 +250,14 @@ static class RecordConverter extends DataConverter<Record, 
GenericRecord> {
 
             for (DataConverter<?, ?> converter : converters) {
                 final Optional<RecordField> recordField = 
recordSchema.getField(converter.getSourceFieldName());
-                final RecordField field = recordField.get();
-                // creates a record field accessor for every data converter
-                getters.put(converter.getTargetFieldName(), 
createFieldGetter(field.getDataType(), field.getFieldName(), 
field.isNullable()));
+                if (recordField.isEmpty()) {
+                    Types.NestedField missingField = 
schema.field(converter.getSourceFieldName());

Review Comment:
   Here you should get the `targetFieldName` since it contains the iceberg 
schema field's name.
   ```suggestion
                       Types.NestedField missingField = 
schema.field(converter.getTargetFieldName());
   ```



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestIcebergRecordConverter.java:
##########
@@ -466,19 +507,104 @@ public void testPrimitives(FileFormat format) throws 
IOException {
         assertEquals(LocalDateTime.of(2017, 4, 4, 14, 20, 33, 789000000), 
resultRecord.get(12, LocalDateTime.class));
         assertEquals(Integer.valueOf(10), resultRecord.get(14, Integer.class));
 
-        if (format.equals(PARQUET)) {
-            assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0}, resultRecord.get(13, byte[].class));
-        } else {
-            assertEquals(UUID.fromString("0000-00-00-00-000000"), 
resultRecord.get(13, UUID.class));
-        }
+        assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0}, resultRecord.get(13, byte[].class));
     }
 
     @DisabledOnOs(WINDOWS)
-    @ParameterizedTest
-    @EnumSource(value = FileFormat.class, names = {"AVRO", "ORC", "PARQUET"})
-    public void testCompatiblePrimitives(FileFormat format) throws IOException 
{
+    @Test
+    public void testPrimitivesIgnoreMissingFields() throws IOException {

Review Comment:
   If I set the Iceberg schema fields from optional to required the new tests 
are failing on `AVRO` and `PARQUET`. This is why it is not recommended to 
remove the `ParameterizedTest` since it is testing different writers for the 
different formats. 



##########
nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java:
##########
@@ -134,12 +133,12 @@ private void initCatalog(PartitionSpec spec, String 
fileFormat) throws Initializ
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"avro"})
-    public void onTriggerPartitioned(String fileFormat) throws Exception {
+    @Test
+    public void onTriggerPartitioned() throws Exception {
         PartitionSpec spec = PartitionSpec.builderFor(USER_SCHEMA)
                 .bucket("department", 3)
                 .build();
+        final String fileFormat = "avro";

Review Comment:
   Please use `org.apache.iceberg.FileFormat` instead of strings. Also you can 
pass this parameter directly to the `initCatalog` method since it is only used 
there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to