This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5108d7c  NIFI-8439 Update parquet-avro to allow reading parquet INT96 
timestamps as byte arrays (instead of throwing an exception).
5108d7c is described below

commit 5108d7cdd015ef14e98f2acfb61b0213972fe29e
Author: Tamas Palfy <[email protected]>
AuthorDate: Mon Apr 19 14:59:26 2021 +0200

    NIFI-8439 Update parquet-avro to allow reading parquet INT96 timestamps as 
byte arrays (instead of throwing an exception).
    
    Also allow to write them as such (byte-arrays) - again, instead of throwing 
an exception.
    
    NIFI-8439 Fixed unit tests.
    NIFI-8439 Allow writing parquet INT96 timestamps if they were read by the 
same parquet-avro library.
    
    This closes #5006.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../main/java/org/apache/nifi/avro/AvroTypeUtil.java  |  12 +++++++++++-
 .../nifi-parquet-processors/pom.xml                   |   2 +-
 .../apache/nifi/parquet/ParquetRecordSetWriter.java   |  18 ++++++++++++++++++
 .../org/apache/nifi/parquet/utils/ParquetConfig.java  |   9 +++++++++
 .../org/apache/nifi/parquet/utils/ParquetUtils.java   |   5 +++++
 .../processors/parquet/TestConvertAvroToParquet.java  |   4 ++--
 .../src/test/resources/avro/user-with-array.avsc      |   2 +-
 .../standard/ConversionWithSchemaInferenceIT.java     |   2 +-
 .../data.int_float_string.with_schema.json.to.avro    | Bin 322 -> 320 bytes
 9 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 3eb26e3..2d5ee94 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -741,7 +741,17 @@ public class AvroTypeUtil {
                     return ByteBuffer.wrap(((String) 
rawValue).getBytes(charset));
                 }
                 if (rawValue instanceof Object[]) {
-                    return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+                    if (fieldSchema.getType() == Type.FIXED && 
"INT96".equals(fieldSchema.getName())) {
+                        Object[] rawObjects = (Object[]) rawValue;
+                        byte[] rawBytes = new byte[rawObjects.length];
+                        for (int elementIndex = 0; elementIndex < 
rawObjects.length; elementIndex++) {
+                            rawBytes[elementIndex] = (Byte) 
rawObjects[elementIndex];
+                        }
+
+                        return new GenericData.Fixed(fieldSchema, rawBytes);
+                    } else {
+                        return AvroTypeUtil.convertByteArray((Object[]) 
rawValue);
+                    }
                 }
                 try {
                     if (rawValue instanceof Blob) {
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index e21aec1..db5654e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -58,7 +58,7 @@
         <dependency>
             <groupId>org.apache.parquet</groupId>
             <artifactId>parquet-avro</artifactId>
-            <version>1.10.0</version>
+            <version>1.12.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
index 85af65f..53d6e02 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
@@ -59,7 +59,16 @@ public class ParquetRecordSetWriter extends 
SchemaRegistryRecordSetWriter implem
             .required(true)
             .build();
 
+    public static final PropertyDescriptor INT96_FIELDS = new 
PropertyDescriptor.Builder()
+            .name("int96-fields")
+            .displayName("INT96 Fields")
+            .description("List of fields with full path that should be treated 
as INT96 timestamps.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .required(false)
+            .build();
+
     private LoadingCache<String, Schema> compiledAvroSchemaCache;
+    private String int96Fields;
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
@@ -67,12 +76,20 @@ public class ParquetRecordSetWriter extends 
SchemaRegistryRecordSetWriter implem
         compiledAvroSchemaCache = Caffeine.newBuilder()
                 .maximumSize(cacheSize)
                 .build(schemaText -> new Schema.Parser().parse(schemaText));
+
+        if (context.getProperty(INT96_FIELDS).isSet()) {
+            int96Fields = context.getProperty(INT96_FIELDS).getValue();
+        } else {
+            int96Fields = null;
+        }
     }
 
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema recordSchema,
                                         final OutputStream out, final 
Map<String, String> variables) throws IOException {
         final ParquetConfig parquetConfig = 
createParquetConfig(getConfigurationContext(), variables);
+        parquetConfig.setInt96Fields(int96Fields);
+
         try {
             final Schema avroSchema;
             try {
@@ -111,6 +128,7 @@ public class ParquetRecordSetWriter extends 
SchemaRegistryRecordSetWriter implem
         properties.add(ParquetUtils.WRITER_VERSION);
         properties.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE);
         properties.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS);
+        properties.add(INT96_FIELDS);
         return properties;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
index 5cc4abe..eb9ad52 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
@@ -34,6 +34,7 @@ public class ParquetConfig {
     private ParquetProperties.WriterVersion writerVersion;
     private ParquetFileWriter.Mode writerMode;
     private CompressionCodecName compressionCodec;
+    private String int96Fields;
 
     public Integer getRowGroupSize() {
         return rowGroupSize;
@@ -130,4 +131,12 @@ public class ParquetConfig {
     public void setCompressionCodec(CompressionCodecName compressionCodec) {
         this.compressionCodec = compressionCodec;
     }
+
+    public String getInt96Fields() {
+        return int96Fields;
+    }
+
+    public void setInt96Fields(String int96Fields) {
+        this.int96Fields = int96Fields;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
index e7c1c37..347d243 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
@@ -310,5 +310,10 @@ public class ParquetUtils {
             conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
                     
parquetConfig.getAvroWriteOldListStructure().booleanValue());
         }
+        conf.setBoolean(AvroReadSupport.READ_INT96_AS_FIXED, true);
+        if (parquetConfig.getInt96Fields() != null) {
+            conf.setStrings("parquet.avro.writeFixedAsInt96",
+                parquetConfig.getInt96Fields());
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
index 261de9b..00bd23e 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
@@ -244,8 +244,8 @@ public class TestConvertAvroToParquet {
         
assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element",
 0), 2);
 
         // Map
-        
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value",
 0), 1);
-        
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value",
 0), 2);
+        
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",0).getInteger("value",
 0), 1);
+        
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",1).getInteger("value",
 0), 2);
 
         // Fixed
         assertEquals(firstRecord.getString("myfixed",0), "A");
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
index 67a0cca..9fe2059 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
@@ -4,6 +4,6 @@
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_colors",  "type": { "type": "array", "items": 
["string","null"] }, "default": null }
+     {"name": "favorite_colors",  "type": { "type": "array", "items": 
["string","null"] }, "default": [] }
  ]
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
index bed820c..24970de 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
@@ -41,7 +41,7 @@ public class ConversionWithSchemaInferenceIT extends 
AbstractConversionIT {
     public void testJsonToAvro() throws Exception {
         fromJson(jsonPostfix());
 
-        // JSON schema inference doesn't discern INT and FLOAT but uses LONG 
and DOUBLE instead.
+        // JSON schema inference doesn't discern FLOAT but uses DOUBLE instead.
         //  So the expected avro is a little bit different as the deserialized 
values also end up in
         //      Long and Double objects
         toAvro("with_schema.json.to.avro");
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
index 4177b05..0293c07 100644
Binary files 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
 and 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
 differ

Reply via email to