This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new a53b2d023f6 CAMEL-19586: camel-parquet-avro - Allow users to unmarshal
Parquet file into Avro's GenericRecords (#10605)
a53b2d023f6 is described below
commit a53b2d023f65f0b8aea281e9684c15a341c19781
Author: Kengo Seki <[email protected]>
AuthorDate: Sat Jul 8 00:08:34 2023 +0900
CAMEL-19586: camel-parquet-avro - Allow users to unmarshal Parquet file
into Avro's GenericRecords (#10605)
---
.../camel/catalog/dataformats/parquetAvro.json | 2 +-
.../apache/camel/catalog/models/parquetAvro.json | 2 +-
.../apache/camel/catalog/schemas/camel-spring.xsd | 5 +-
.../camel/dataformat/parquet/avro/parquetAvro.json | 2 +-
.../src/main/docs/parquetAvro-dataformat.adoc | 6 +-
.../parquet/avro/ParquetAvroDataFormat.java | 44 ++++++++++--
...quetAvroDataFormatWithoutUnmarshalTypeTest.java | 84 ++++++++++++++++++++++
.../apache/camel/model/dataformat/parquetAvro.json | 2 +-
.../model/dataformat/ParquetAvroDataFormat.java | 3 +-
9 files changed, 133 insertions(+), 17 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
index b4658eaca90..63a4b60b424 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dataformats/parquetAvro.json
@@ -17,7 +17,7 @@
},
"properties": {
"compressionCodecName": { "index": 0, "kind": "attribute", "displayName":
"Compression Codec Name", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "GZIP", "description": "Compression codec to use when
marshalling." },
- "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when unmarshalling." },
+ "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when (un)marshalling. If omitted, parquet files
are converted into Avro's GenericRecords for unmarshalling and input objects
are assumed as GenericRecords for marshalling." },
"id": { "index": 2, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The id of this node" }
}
}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
index 7d86486496e..90899fe4f97 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/parquetAvro.json
@@ -14,7 +14,7 @@
},
"properties": {
"compressionCodecName": { "index": 0, "kind": "attribute", "displayName":
"Compression Codec Name", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "GZIP", "description": "Compression codec to use when
marshalling." },
- "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when unmarshalling." },
+ "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when (un)marshalling. If omitted, parquet files
are converted into Avro's GenericRecords for unmarshalling and input objects
are assumed as GenericRecords for marshalling." },
"id": { "index": 2, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The id of this node" }
}
}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 52d3831c7a7..36d9129b9a1 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -8743,7 +8743,8 @@ Compression codec to use when marshalling. Default value:
GZIP
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
-Class to use when unmarshalling.
+Class to use when (un)marshalling. If omitted, parquet files are converted
into Avro's GenericRecords for unmarshalling
+and input objects are assumed as GenericRecords for marshalling.
]]>
</xs:documentation>
</xs:annotation>
@@ -17202,4 +17203,4 @@ An optional certificate alias to use. This is useful
when the keystore has multi
<xs:enumeration value="TransactionErrorHandler"/>
</xs:restriction>
</xs:simpleType>
-</xs:schema>
\ No newline at end of file
+</xs:schema>
diff --git
a/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
b/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
index b4658eaca90..63a4b60b424 100644
---
a/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
+++
b/components/camel-parquet-avro/src/generated/resources/org/apache/camel/dataformat/parquet/avro/parquetAvro.json
@@ -17,7 +17,7 @@
},
"properties": {
"compressionCodecName": { "index": 0, "kind": "attribute", "displayName":
"Compression Codec Name", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "GZIP", "description": "Compression codec to use when
marshalling." },
- "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when unmarshalling." },
+ "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when (un)marshalling. If omitted, parquet files
are converted into Avro's GenericRecords for unmarshalling and input objects
are assumed as GenericRecords for marshalling." },
"id": { "index": 2, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The id of this node" }
}
}
diff --git
a/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
b/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
index c73f0d3785a..6384cb17e8c 100644
--- a/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
+++ b/components/camel-parquet-avro/src/main/docs/parquetAvro-dataformat.adoc
@@ -11,7 +11,7 @@
*Since Camel {since}*
-The ParquetAvro Data Format is a Camel Framework's data format implementation
based on parquet-avro library for (de)/serialization purposes. Messages can be
unmarshalled (conversion to simple Java POJO(s)) to plain Java objects. By the
help of Camel's routing engine and data transformations you can then play with
POJO(s) and apply customised formatting and call other Camel Component's to
convert and send messages to upstream systems.
+The ParquetAvro Data Format is a Camel Framework's data format implementation
based on parquet-avro library for (de)/serialization purposes. Messages can be
unmarshalled to Avro's GenericRecords or plain Java objects (POJOs). By the
help of Camel's routing engine and data transformations you can then play with
them and apply customised formatting and call other Camel Component's to
convert and send messages to upstream systems.
== Parquet Data Format Options
@@ -23,7 +23,7 @@ include::partial$dataformat-options.adoc[]
There are ways to unmarshal parquet files/structures (Usually binary parquet
files) where camel DSL allows
-In this first example we unmarshal file payload to OutputStream and send it to
mock endpoint, then we will be able to get Pojo (it could be a list if that is
coming through)
+In this first example we unmarshal file payload to OutputStream and send it to
mock endpoint, then we will be able to get GenericRecord or POJO (it could be a
list if that is coming through)
[source,java]
-----------------------------------------------------------------------
@@ -32,7 +32,7 @@
from("direct:unmarshal").unmarshal(parquet).to("mock:unmarshal");
== Marshal
-Marshalling is the reverse process of unmarshalling so when you have your Pojo
and marshal it you will get the parquet formatted output stream on your
producer endpoint.
+Marshalling is the reverse process of unmarshalling so when you have your
GenericRecord or POJO and marshal it you will get the parquet formatted output
stream on your producer endpoint.
[source,java]
-----------------------------------------------------------------------
diff --git
a/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
b/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
index d150492f6a7..6d7a4f7b7d2 100644
---
a/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
+++
b/components/camel-parquet-avro/src/main/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormat.java
@@ -22,6 +22,11 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.camel.Exchange;
import org.apache.camel.spi.DataFormat;
@@ -36,6 +41,8 @@ import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
@@ -43,6 +50,8 @@ import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
@Dataformat("parquetAvro")
public class ParquetAvroDataFormat extends ServiceSupport implements
DataFormat, DataFormatName {
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetAvroDataFormat.class);
+
private static final DefaultUuidGenerator DEFAULT_UUID_GENERATOR = new
DefaultUuidGenerator();
private CompressionCodecName compressionCodecName = GZIP;
@@ -54,7 +63,7 @@ public class ParquetAvroDataFormat extends ServiceSupport
implements DataFormat,
}
public void marshal(Exchange exchange, Object graph, OutputStream stream)
throws Exception {
- // marshal from the Java object (graph) to the parquet-avro type
+ // marshal from the Java object or GenericRecord (graph) to the
parquet-avro type
Configuration conf = new Configuration();
FileSystem.get(conf).setWriteChecksum(false);
@@ -66,9 +75,24 @@ public class ParquetAvroDataFormat extends ServiceSupport
implements DataFormat,
List<?> list = (List<?>) graph;
+ Schema schema = null;
+ GenericData model = null;
+ if (unmarshalType != null) {
+ try {
+ schema = ReflectData.AllowNull.get().getSchema(unmarshalType);
// generate nullable fields
+ model = ReflectData.get();
+ } catch (AvroRuntimeException e) {
+ LOG.warn("Fall back to use GenericRecord instead of POJO for
marshalling", e);
+ }
+ }
+ if (schema == null) {
+ schema = GenericContainer.class.cast(list.get(0)).getSchema();
+ model = GenericData.get();
+ }
+
try (ParquetWriter writer =
AvroParquetWriter.builder(parquetOutputStream)
-
.withSchema(ReflectData.AllowNull.get().getSchema(unmarshalType)) // generate
nullable fields
- .withDataModel(ReflectData.get())
+ .withSchema(schema)
+ .withDataModel(model)
.withConf(conf)
.withCompressionCodec(compressionCodecName)
.withWriteMode(OVERWRITE)
@@ -80,7 +104,7 @@ public class ParquetAvroDataFormat extends ServiceSupport
implements DataFormat,
}
public Object unmarshal(Exchange exchange, InputStream stream) throws
Exception {
- // unmarshal from the input stream of parquet-avro to Java object
(graph)
+ // unmarshal from the input stream of parquet-avro to Java object or
GenericRecord (graph)
List<Object> parquetObjects = new ArrayList<>();
Configuration conf = new Configuration();
@@ -88,14 +112,20 @@ public class ParquetAvroDataFormat extends ServiceSupport
implements DataFormat,
DEFAULT_UUID_GENERATOR.generateUuid(),
stream.readAllBytes());
+ Class<?> type = GenericRecord.class;
+ GenericData model = GenericData.get();
+ if (unmarshalType != null) {
+ type = unmarshalType;
+ model = new ReflectData(unmarshalType.getClassLoader());
+ }
+
try (ParquetReader reader =
AvroParquetReader.builder(parquetInputStream)
- .withDataModel(new ReflectData(unmarshalType.getClassLoader()))
+ .withDataModel(model)
.disableCompatibility() // always use this (since this is a
new project)
.withConf(conf)
.build()) {
-
Object pojo;
- while ((pojo = unmarshalType.cast(reader.read())) != null) {
+ while ((pojo = type.cast(reader.read())) != null) {
parquetObjects.add(pojo);
}
}
diff --git
a/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
b/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
new file mode 100644
index 00000000000..4c909673815
--- /dev/null
+++
b/components/camel-parquet-avro/src/test/java/org/apache/camel/dataformat/parquet/avro/ParquetAvroDataFormatWithoutUnmarshalTypeTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.parquet.avro;
+
+import java.util.List;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.DefaultUuidGenerator;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ParquetAvroDataFormatWithoutUnmarshalTypeTest extends
CamelTestSupport {
+
+ @Test
+ public void testMarshalAndUnmarshalMapWithoutUnmarshalType() throws
Exception {
+ List<Pojo> in = List.of(
+ new Pojo(1, "airport"),
+ new Pojo(2, "penguin"),
+ new Pojo(3, "verb"));
+ MockEndpoint unmarshalMock = getMockEndpoint("mock:unmarshalled");
+ unmarshalMock.expectedMessageCount(1);
+
+ MockEndpoint marshalMock = getMockEndpoint("mock:marshalled");
+ marshalMock.expectedMessageCount(1);
+
+ template.sendBody("direct:in", in);
+ unmarshalMock.assertIsSatisfied();
+ marshalMock.assertIsSatisfied();
+
+ List<GenericRecord> records =
unmarshalMock.getExchanges().get(0).getMessage().getBody(List.class);
+ assertEquals(in.size(), records.size());
+ for (int i = 0; i < records.size(); i++) {
+ GenericRecord record = GenericRecord.class.cast(records.get(i));
+ assertEquals(in.get(i).getId(), record.get("id"));
+ assertEquals(in.get(i).getData(), record.get("data").toString());
+ }
+
+ byte[] marshalled =
marshalMock.getExchanges().get(0).getIn().getBody(byte[].class);
+ ParquetInputStream inputStream = new ParquetInputStream(new
DefaultUuidGenerator().generateUuid(), marshalled);
+ try (ParquetFileReader reader = new ParquetFileReader(inputStream,
ParquetReadOptions.builder().build())) {
+ assertEquals(in.size(), reader.getRecordCount());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ // First we get a Parquet data from POJO using reflection as
preparation
+ ParquetAvroDataFormat format = new ParquetAvroDataFormat();
+ format.setUnmarshalType(Pojo.class);
+ from("direct:in").marshal(format).to("direct:marshalled");
+
+ // Then we ensure that data can be unmarshalled and marshalled
again with Avro's GenericRecord
+ ParquetAvroDataFormat formatWithoutUnmarshalType = new
ParquetAvroDataFormat();
+ from("direct:marshalled")
+
.unmarshal(formatWithoutUnmarshalType).to("mock:unmarshalled")
+
.marshal(formatWithoutUnmarshalType).to("mock:marshalled");
+ }
+ };
+ }
+}
diff --git
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
index 7d86486496e..90899fe4f97 100644
---
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
+++
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/dataformat/parquetAvro.json
@@ -14,7 +14,7 @@
},
"properties": {
"compressionCodecName": { "index": 0, "kind": "attribute", "displayName":
"Compression Codec Name", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "GZIP", "description": "Compression codec to use when
marshalling." },
- "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when unmarshalling." },
+ "unmarshalType": { "index": 1, "kind": "attribute", "displayName":
"Unmarshal Type", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Class to use when (un)marshalling. If omitted, parquet files
are converted into Avro's GenericRecords for unmarshalling and input objects
are assumed as GenericRecords for marshalling." },
"id": { "index": 2, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "The id of this node" }
}
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
index 334034be8e5..2524996256b 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/dataformat/ParquetAvroDataFormat.java
@@ -88,7 +88,8 @@ public class ParquetAvroDataFormat extends
DataFormatDefinition {
}
/**
- * Class to use when unmarshalling.
+ * Class to use when (un)marshalling. If omitted, parquet files are
converted into Avro's GenericRecords for
+ * unmarshalling and input objects are assumed as GenericRecords for
marshalling.
*/
public void setUnmarshalTypeName(String unmarshalTypeName) {
this.unmarshalTypeName = unmarshalTypeName;