This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a53b2d023f6 CAMEL-19586: camel-parquet-avro - Allow users to unmarshal 
Parquet file into Avro's GenericRecords (#10605)
a53b2d023f6 is described below

commit a53b2d023f65f0b8aea281e9684c15a341c19781
Author: Kengo Seki <[email protected]>
AuthorDate: Sat Jul 8 00:08:34 2023 +0900

    CAMEL-19586: camel-parquet-avro - Allow users to unmarshal Parquet file 
into Avro's GenericRecords (#10605)
---
 .../camel/catalog/dataformats/parquetAvro.json     |  2 +-
 .../apache/camel/catalog/models/parquetAvro.json   |  2 +-
 .../apache/camel/catalog/schemas/camel-spring.xsd  |  5 +-
 .../camel/dataformat/parquet/avro/parquetAvro.json |  2 +-
 .../src/main/docs/parquetAvro-dataformat.adoc      |  6 +-
 .../parquet/avro/ParquetAvroDataFormat.java        | 44 ++++++++++--
 ...quetAvroDataFormatWithoutUnmarshalTypeTest.java | 84 ++++++++++++++++++++++
 .../apache/camel/model/dataformat/parquetAvro.json |  2 +-
 .../model/dataformat/ParquetAvroDataFormat.java    |  3 +-
 9 files changed, 133 insertions(+), 17 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
index b4658eaca90..63a4b60b424 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
@@ -17,7 +17,7 @@
   },
   "properties": {
     "compressionCodecName": { "index": 0, "kind": "attribute", "displayName": 
"Compression Codec Name", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "GZIP", "description": "Compression codec to use when 
marshalling." },
-    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when unmarshalling." },
+    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when (un)marshalling. If omitted, parquet files 
are converted into Avro's GenericRecords for unmarshalling and input objects 
are assumed as GenericRecords for marshalling." },
     "id": { "index": 2, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The id of this node" }
   }
 }
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
index 7d86486496e..90899fe4f97 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
@@ -14,7 +14,7 @@
   },
   "properties": {
     "compressionCodecName": { "index": 0, "kind": "attribute", "displayName": 
"Compression Codec Name", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "GZIP", "description": "Compression codec to use when 
marshalling." },
-    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when unmarshalling." },
+    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when (un)marshalling. If omitted, parquet files 
are converted into Avro's GenericRecords for unmarshalling and input objects 
are assumed as GenericRecords for marshalling." },
     "id": { "index": 2, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The id of this node" }
   }
 }
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 52d3831c7a7..36d9129b9a1 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -8743,7 +8743,8 @@ Compression codec to use when marshalling. Default value: 
GZIP
           <xs:annotation>
             <xs:documentation xml:lang="en">
 <![CDATA[
-Class to use when unmarshalling.
+Class to use when (un)marshalling. If omitted, parquet files are converted 
into Avro's GenericRecords for unmarshalling
+and input objects are assumed as GenericRecords for marshalling.
 ]]>
             </xs:documentation>
           </xs:annotation>
@@ -17202,4 +17203,4 @@ An optional certificate alias to use. This is useful 
when the keystore has multi
       <xs:enumeration value="TransactionErrorHandler"/>
     </xs:restriction>
   </xs:simpleType>
-</xs:schema>
\ No newline at end of file
+</xs:schema>
diff --git 
a/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
 
b/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
index b4658eaca90..63a4b60b424 100644
--- 
a/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
+++ 
b/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
@@ -17,7 +17,7 @@
   },
   "properties": {
     "compressionCodecName": { "index": 0, "kind": "attribute", "displayName": 
"Compression Codec Name", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "GZIP", "description": "Compression codec to use when 
marshalling." },
-    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when unmarshalling." },
+    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when (un)marshalling. If omitted, parquet files 
are converted into Avro's GenericRecords for unmarshalling and input objects 
are assumed as GenericRecords for marshalling." },
     "id": { "index": 2, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The id of this node" }
   }
 }
diff --git 
a/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc 
b/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
index c73f0d3785a..6384cb17e8c 100644
--- a/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
+++ b/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
@@ -11,7 +11,7 @@
 
 *Since Camel {since}*
 
-The ParquetAvro Data Format is a Camel Framework's data format implementation 
based on parquet-avro library for (de)/serialization purposes. Messages can be 
unmarshalled (conversion to simple Java POJO(s)) to plain Java objects. By the 
help of Camel's routing engine and data transformations you can then play with 
POJO(s) and apply customised formatting and call other Camel Component's to 
convert and send messages to upstream systems.
+The ParquetAvro Data Format is a Camel Framework's data format implementation 
based on parquet-avro library for (de)/serialization purposes. Messages can be 
unmarshalled to Avro's GenericRecords or plain Java objects (POJOs). By the 
help of Camel's routing engine and data transformations you can then play with 
them and apply customised formatting and call other Camel Component's to 
convert and send messages to upstream systems.
 
 == Parquet Data Format Options
 
@@ -23,7 +23,7 @@ include::partial$dataformat-options.adoc[]
 
 There are ways to unmarshal parquet files/structures (Usually binary parquet 
files) where camel DSL allows
 
-In this first example we unmarshal file payload to OutputStream and send it to 
mock endpoint, then we will be able to get Pojo (it could be a list if that is 
coming through)
+In this first example we unmarshal file payload to OutputStream and send it to 
mock endpoint, then we will be able to get GenericRecord or POJO (it could be a 
list if that is coming through)
 
 [source,java]
 -----------------------------------------------------------------------
@@ -32,7 +32,7 @@ 
from("direct:unmarshal").unmarshal(parquet).to("mock:unmarshal");
 
 == Marshal
 
-Marshalling is the reverse process of unmarshalling so when you have your Pojo 
and marshal it you will get the parquet formatted output stream on your 
producer endpoint.
+Marshalling is the reverse process of unmarshalling so when you have your 
GenericRecord or POJO and marshal it you will get the parquet formatted output 
stream on your producer endpoint.
 
 [source,java]
 -----------------------------------------------------------------------
diff --git 
a/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
 
b/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
index d150492f6a7..6d7a4f7b7d2 100644
--- 
a/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
+++ 
b/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
@@ -22,6 +22,11 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.DataFormat;
@@ -36,6 +41,8 @@ import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
 import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
@@ -43,6 +50,8 @@ import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
 @Dataformat("parquetAvro")
 public class ParquetAvroDataFormat extends ServiceSupport implements 
DataFormat, DataFormatName {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ParquetAvroDataFormat.class);
+
     private static final DefaultUuidGenerator DEFAULT_UUID_GENERATOR = new 
DefaultUuidGenerator();
 
     private CompressionCodecName compressionCodecName = GZIP;
@@ -54,7 +63,7 @@ public class ParquetAvroDataFormat extends ServiceSupport 
implements DataFormat,
     }
 
     public void marshal(Exchange exchange, Object graph, OutputStream stream) 
throws Exception {
-        // marshal from the Java object (graph) to the parquet-avro type
+        // marshal from the Java object or GenericRecord (graph) to the 
parquet-avro type
         Configuration conf = new Configuration();
 
         FileSystem.get(conf).setWriteChecksum(false);
@@ -66,9 +75,24 @@ public class ParquetAvroDataFormat extends ServiceSupport 
implements DataFormat,
 
         List<?> list = (List<?>) graph;
 
+        Schema schema = null;
+        GenericData model = null;
+        if (unmarshalType != null) {
+            try {
+                schema = ReflectData.AllowNull.get().getSchema(unmarshalType); 
// generate nullable fields
+                model = ReflectData.get();
+            } catch (AvroRuntimeException e) {
+                LOG.warn("Fall back to use GenericRecord instead of POJO for 
marshalling", e);
+            }
+        }
+        if (schema == null) {
+            schema = GenericContainer.class.cast(list.get(0)).getSchema();
+            model = GenericData.get();
+        }
+
         try (ParquetWriter writer = 
AvroParquetWriter.builder(parquetOutputStream)
-                
.withSchema(ReflectData.AllowNull.get().getSchema(unmarshalType)) // generate 
nullable fields
-                .withDataModel(ReflectData.get())
+                .withSchema(schema)
+                .withDataModel(model)
                 .withConf(conf)
                 .withCompressionCodec(compressionCodecName)
                 .withWriteMode(OVERWRITE)
@@ -80,7 +104,7 @@ public class ParquetAvroDataFormat extends ServiceSupport 
implements DataFormat,
     }
 
     public Object unmarshal(Exchange exchange, InputStream stream) throws 
Exception {
-        // unmarshal from the input stream of parquet-avro to Java object 
(graph)
+        // unmarshal from the input stream of parquet-avro to Java object or 
GenericRecord (graph)
         List<Object> parquetObjects = new ArrayList<>();
         Configuration conf = new Configuration();
 
@@ -88,14 +112,20 @@ public class ParquetAvroDataFormat extends ServiceSupport 
implements DataFormat,
                 DEFAULT_UUID_GENERATOR.generateUuid(),
                 stream.readAllBytes());
 
+        Class<?> type = GenericRecord.class;
+        GenericData model = GenericData.get();
+        if (unmarshalType != null) {
+            type = unmarshalType;
+            model = new ReflectData(unmarshalType.getClassLoader());
+        }
+
         try (ParquetReader reader = 
AvroParquetReader.builder(parquetInputStream)
-                .withDataModel(new ReflectData(unmarshalType.getClassLoader()))
+                .withDataModel(model)
                 .disableCompatibility() // always use this (since this is a 
new project)
                 .withConf(conf)
                 .build()) {
-
             Object pojo;
-            while ((pojo = unmarshalType.cast(reader.read())) != null) {
+            while ((pojo = type.cast(reader.read())) != null) {
                 parquetObjects.add(pojo);
             }
         }
diff --git 
a/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
 
b/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
new file mode 100644
index 00000000000..4c909673815
--- /dev/null
+++ 
b/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.parquet.avro;
+
+import java.util.List;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultUuidGenerator;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ParquetAvroDataFormatWithoutUnmarshalTypeTest extends 
CamelTestSupport {
+
+    @Test
+    public void testMarshalAndUnmarshalMapWithoutUnmarshalType() throws 
Exception {
+        List<Pojo> in = List.of(
+                new Pojo(1, "airport"),
+                new Pojo(2, "penguin"),
+                new Pojo(3, "verb"));
+        MockEndpoint unmarshalMock = getMockEndpoint("mock:unmarshalled");
+        unmarshalMock.expectedMessageCount(1);
+
+        MockEndpoint marshalMock = getMockEndpoint("mock:marshalled");
+        marshalMock.expectedMessageCount(1);
+
+        template.sendBody("direct:in", in);
+        unmarshalMock.assertIsSatisfied();
+        marshalMock.assertIsSatisfied();
+
+        List<GenericRecord> records = 
unmarshalMock.getExchanges().get(0).getMessage().getBody(List.class);
+        assertEquals(in.size(), records.size());
+        for (int i = 0; i < records.size(); i++) {
+            GenericRecord record = GenericRecord.class.cast(records.get(i));
+            assertEquals(in.get(i).getId(), record.get("id"));
+            assertEquals(in.get(i).getData(), record.get("data").toString());
+        }
+
+        byte[] marshalled = 
marshalMock.getExchanges().get(0).getIn().getBody(byte[].class);
+        ParquetInputStream inputStream = new ParquetInputStream(new 
DefaultUuidGenerator().generateUuid(), marshalled);
+        try (ParquetFileReader reader = new ParquetFileReader(inputStream, 
ParquetReadOptions.builder().build())) {
+            assertEquals(in.size(), reader.getRecordCount());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                // First we get a Parquet data from POJO using reflection as 
preparation
+                ParquetAvroDataFormat format = new ParquetAvroDataFormat();
+                format.setUnmarshalType(Pojo.class);
+                from("direct:in").marshal(format).to("direct:marshalled");
+
+                // Then we ensure that data can be unmarshalled and marshalled 
again with Avro's GenericRecord
+                ParquetAvroDataFormat formatWithoutUnmarshalType = new 
ParquetAvroDataFormat();
+                from("direct:marshalled")
+                        
.unmarshal(formatWithoutUnmarshalType).to("mock:unmarshalled")
+                        
.marshal(formatWithoutUnmarshalType).to("mock:marshalled");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
index 7d86486496e..90899fe4f97 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
@@ -14,7 +14,7 @@
   },
   "properties": {
     "compressionCodecName": { "index": 0, "kind": "attribute", "displayName": 
"Compression Codec Name", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "GZIP", "description": "Compression codec to use when 
marshalling." },
-    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when unmarshalling." },
+    "unmarshalType": { "index": 1, "kind": "attribute", "displayName": 
"Unmarshal Type", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Class to use when (un)marshalling. If omitted, parquet files 
are converted into Avro's GenericRecords for unmarshalling and input objects 
are assumed as GenericRecords for marshalling." },
     "id": { "index": 2, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "The id of this node" }
   }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
index 334034be8e5..2524996256b 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
@@ -88,7 +88,8 @@ public class ParquetAvroDataFormat extends 
DataFormatDefinition {
     }
 
     /**
-     * Class to use when unmarshalling.
+     * Class to use when (un)marshalling. If omitted, parquet files are 
converted into Avro's GenericRecords for
+     * unmarshalling and input objects are assumed as GenericRecords for 
marshalling.
      */
     public void setUnmarshalTypeName(String unmarshalTypeName) {
         this.unmarshalTypeName = unmarshalTypeName;

Reply via email to