This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5108d7c NIFI-8439 Update parquet-avro to allow reading parquet INT96
timestamps as byte arrays (instead of throwing an exception).
5108d7c is described below
commit 5108d7cdd015ef14e98f2acfb61b0213972fe29e
Author: Tamas Palfy <[email protected]>
AuthorDate: Mon Apr 19 14:59:26 2021 +0200
NIFI-8439 Update parquet-avro to allow reading parquet INT96 timestamps as
byte arrays (instead of throwing an exception).
Also allow to write them as such (byte-arrays) - again, instead of throwing
an exception.
NIFI-8439 Fixed unit tests.
NIFI-8439 Allow writing parquet INT96 timestamps if they were read by the
same parquet-avro library.
This closes #5006.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../main/java/org/apache/nifi/avro/AvroTypeUtil.java | 12 +++++++++++-
.../nifi-parquet-processors/pom.xml | 2 +-
.../apache/nifi/parquet/ParquetRecordSetWriter.java | 18 ++++++++++++++++++
.../org/apache/nifi/parquet/utils/ParquetConfig.java | 9 +++++++++
.../org/apache/nifi/parquet/utils/ParquetUtils.java | 5 +++++
.../processors/parquet/TestConvertAvroToParquet.java | 4 ++--
.../src/test/resources/avro/user-with-array.avsc | 2 +-
.../standard/ConversionWithSchemaInferenceIT.java | 2 +-
.../data.int_float_string.with_schema.json.to.avro | Bin 322 -> 320 bytes
9 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 3eb26e3..2d5ee94 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -741,7 +741,17 @@ public class AvroTypeUtil {
return ByteBuffer.wrap(((String)
rawValue).getBytes(charset));
}
if (rawValue instanceof Object[]) {
- return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+ if (fieldSchema.getType() == Type.FIXED &&
"INT96".equals(fieldSchema.getName())) {
+ Object[] rawObjects = (Object[]) rawValue;
+ byte[] rawBytes = new byte[rawObjects.length];
+ for (int elementIndex = 0; elementIndex <
rawObjects.length; elementIndex++) {
+ rawBytes[elementIndex] = (Byte)
rawObjects[elementIndex];
+ }
+
+ return new GenericData.Fixed(fieldSchema, rawBytes);
+ } else {
+ return AvroTypeUtil.convertByteArray((Object[])
rawValue);
+ }
}
try {
if (rawValue instanceof Blob) {
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index e21aec1..db5654e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -58,7 +58,7 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
- <version>1.10.0</version>
+ <version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
index 85af65f..53d6e02 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java
@@ -59,7 +59,16 @@ public class ParquetRecordSetWriter extends
SchemaRegistryRecordSetWriter implem
.required(true)
.build();
+ public static final PropertyDescriptor INT96_FIELDS = new
PropertyDescriptor.Builder()
+ .name("int96-fields")
+ .displayName("INT96 Fields")
+ .description("List of fields with full path that should be treated
as INT96 timestamps.")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .required(false)
+ .build();
+
private LoadingCache<String, Schema> compiledAvroSchemaCache;
+ private String int96Fields;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
@@ -67,12 +76,20 @@ public class ParquetRecordSetWriter extends
SchemaRegistryRecordSetWriter implem
compiledAvroSchemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build(schemaText -> new Schema.Parser().parse(schemaText));
+
+ if (context.getProperty(INT96_FIELDS).isSet()) {
+ int96Fields = context.getProperty(INT96_FIELDS).getValue();
+ } else {
+ int96Fields = null;
+ }
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final
RecordSchema recordSchema,
final OutputStream out, final
Map<String, String> variables) throws IOException {
final ParquetConfig parquetConfig =
createParquetConfig(getConfigurationContext(), variables);
+ parquetConfig.setInt96Fields(int96Fields);
+
try {
final Schema avroSchema;
try {
@@ -111,6 +128,7 @@ public class ParquetRecordSetWriter extends
SchemaRegistryRecordSetWriter implem
properties.add(ParquetUtils.WRITER_VERSION);
properties.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE);
properties.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS);
+ properties.add(INT96_FIELDS);
return properties;
}
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
index 5cc4abe..eb9ad52 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java
@@ -34,6 +34,7 @@ public class ParquetConfig {
private ParquetProperties.WriterVersion writerVersion;
private ParquetFileWriter.Mode writerMode;
private CompressionCodecName compressionCodec;
+ private String int96Fields;
public Integer getRowGroupSize() {
return rowGroupSize;
@@ -130,4 +131,12 @@ public class ParquetConfig {
public void setCompressionCodec(CompressionCodecName compressionCodec) {
this.compressionCodec = compressionCodec;
}
+
+ public String getInt96Fields() {
+ return int96Fields;
+ }
+
+ public void setInt96Fields(String int96Fields) {
+ this.int96Fields = int96Fields;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
index e7c1c37..347d243 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java
@@ -310,5 +310,10 @@ public class ParquetUtils {
conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
parquetConfig.getAvroWriteOldListStructure().booleanValue());
}
+ conf.setBoolean(AvroReadSupport.READ_INT96_AS_FIXED, true);
+ if (parquetConfig.getInt96Fields() != null) {
+ conf.setStrings("parquet.avro.writeFixedAsInt96",
+ parquetConfig.getInt96Fields());
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
index 261de9b..00bd23e 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
@@ -244,8 +244,8 @@ public class TestConvertAvroToParquet {
assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element",
0), 2);
// Map
-
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value",
0), 1);
-
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value",
0), 2);
+
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",0).getInteger("value",
0), 1);
+
assertEquals(firstRecord.getGroup("mymap",0).getGroup("key_value",1).getInteger("value",
0), 2);
// Fixed
assertEquals(firstRecord.getString("myfixed",0), "A");
diff --git
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
index 67a0cca..9fe2059 100644
---
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
+++
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user-with-array.avsc
@@ -4,6 +4,6 @@
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
- {"name": "favorite_colors", "type": { "type": "array", "items":
["string","null"] }, "default": null }
+ {"name": "favorite_colors", "type": { "type": "array", "items":
["string","null"] }, "default": [] }
]
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
index bed820c..24970de 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/ConversionWithSchemaInferenceIT.java
@@ -41,7 +41,7 @@ public class ConversionWithSchemaInferenceIT extends
AbstractConversionIT {
public void testJsonToAvro() throws Exception {
fromJson(jsonPostfix());
- // JSON schema inference doesn't discern INT and FLOAT but uses LONG
and DOUBLE instead.
+ // JSON schema inference doesn't discern FLOAT but uses DOUBLE instead.
// So the expected avro is a little bit different as the deserialized
values also end up in
// Long and Double objects
toAvro("with_schema.json.to.avro");
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
index 4177b05..0293c07 100644
Binary files
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
and
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConversions/data.int_float_string.with_schema.json.to.avro
differ