This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new ddcdac674b NIFI-12614: Create record reader service for Protobuf
messages (1.x version)
ddcdac674b is described below
commit ddcdac674bd70e2f9fc9d604de3aefe0465a4219
Author: Mark Bathori <[email protected]>
AuthorDate: Tue Jan 16 11:44:02 2024 +0100
NIFI-12614: Create record reader service for Protobuf messages (1.x version)
This closes #8626.
Signed-off-by: Tamas Palfy <[email protected]>
---
nifi-assembly/pom.xml | 6 +
.../nifi-protobuf-services-nar/pom.xml | 42 +++
.../src/main/resources/META-INF/LICENSE | 210 +++++++++++
.../src/main/resources/META-INF/NOTICE | 92 +++++
.../nifi-protobuf-services/pom.xml | 88 +++++
.../apache/nifi/services/protobuf/FieldType.java | 57 +++
.../nifi/services/protobuf/ProtobufReader.java | 177 +++++++++
.../services/protobuf/ProtobufRecordReader.java | 65 ++++
.../services/protobuf/converter/ProtoField.java | 53 +++
.../protobuf/converter/ProtobufDataConverter.java | 403 +++++++++++++++++++++
.../protobuf/schema/ProtoSchemaParser.java | 177 +++++++++
.../protobuf/schema/ProtoSchemaStrategy.java | 49 +++
.../validation/ProtoValidationResource.java | 38 ++
.../org.apache.nifi.controller.ControllerService | 16 +
.../additionalDetails.html | 189 ++++++++++
.../nifi/services/protobuf/ProtoTestUtil.java | 144 ++++++++
.../protobuf/TestProtobufRecordReader.java | 145 ++++++++
.../converter/TestProtobufDataConverter.java | 111 ++++++
.../protobuf/schema/TestProtoSchemaParser.java | 84 +++++
.../src/test/resources/google/protobuf/any.desc | 7 +
.../src/test/resources/test_proto2.desc | 11 +
.../src/test/resources/test_proto2.proto | 35 ++
.../src/test/resources/test_proto3.desc | Bin 0 -> 1022 bytes
.../src/test/resources/test_proto3.proto | 53 +++
nifi-nar-bundles/nifi-protobuf-bundle/pom.xml | 110 ++++++
nifi-nar-bundles/pom.xml | 1 +
26 files changed, 2363 insertions(+)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index d07770d8ba..8c996b5921 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1019,6 +1019,12 @@ language governing permissions and limitations under the
License. -->
<version>1.26.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-protobuf-services-nar</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<!-- AspectJ library needed by the Java Agent used for native library
loading (see bootstrap.conf) -->
<dependency>
<groupId>org.aspectj</groupId>
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
new file mode 100644
index 0000000000..9d0f55fc78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-protobuf-bundle</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-protobuf-services-nar</artifactId>
+ <packaging>nar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-protobuf-services</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-shared-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..84fb21549a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,210 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..b57ec55464
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,92 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+ (ASLv2) Wire
+ The following NOTICE information applies:
+ Wire
+ Copyright 2013 Square, Inc.
+
+ (ASLv2) KotlinPoet
+ The following NOTICE information applies:
+ KotlinPoet
+ Copyright 2017 Square, Inc.
+
+ (ASLv2) Guava: Google Core Libraries For Java
+ The following NOTICE information applies:
+ Guava: Google Core Libraries For Java
+ Copyright (C) 2017 The Guava Authors
+
+ (ASLv2) J2ObjC Annotations
+ The following NOTICE information applies:
+ J2ObjC Annotations
+ Copyright 2022 The J2ObjC Annotations Authors
+
+ (ASLv2) FindBugs JSR305
+ The following NOTICE information applies:
+ FindBugs JSR305
+ Copyright 2017 The FindBugs JSR305 Authors
+
+ (ASLv2) Guava ListenableFuture Only
+ The following NOTICE information applies:
+ Guava ListenableFuture Only
+ Copyright (C) 2018 The Guava Authors
+
+ (ASLv2) Error Prone Annotations
+ The following NOTICE information applies:
+ Error Prone Annotations
+ Copyright 2015 The Error Prone Authors
+
+ (ASLv2) Guava InternalFutureFailureAccess and InternalFutures
+ The following NOTICE information applies:
+ Guava InternalFutureFailureAccess and InternalFutures
+ Copyright (C) 2018 The Guava Authors
+
+ (ASLv2) Okio
+ The following NOTICE information applies:
+ Okio
+ Copyright 2013 Square, Inc.
+
+ (ASLv2) JavaPoet
+ Copyright 2015 Square, Inc.
+
+ (ASLv2) Apache Commons CSV
+ The following NOTICE information applies:
+ Apache Commons CSV
+ Copyright 2005-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2017 The Apache Software Foundation
+
+===========================================
+MIT License
+===========================================
+
+ (MIT) Checker Qual
+ The following NOTICE information applies:
+
+ Copyright (c) Copyright 2004-present by the Checker Framework developers
+ All rights reserved.
+ https://www.checkerframework.org/
+
+************************
+BSD License
+************************
+
+The following binary components are provided under the BSD License. See
project link for details.
+
+ (BSD 3-Clause) Protocol Buffers
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc. All rights reserved.
+ https://github.com/google/protobuf/tree/master/java
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
new file mode 100644
index 0000000000..ace5ea01c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-protobuf-bundle</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-protobuf-services</artifactId>
+
+ <properties>
+ <protobuf.version>3.25.1</protobuf.version>
+ <wire.version>4.9.3</wire.version>
+ </properties>
+
+ <dependencies>
+ <!-- Internal dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-schema-registry-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-avro-record-utils</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ </dependency>
+
+ <!-- External dependencies -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.wire</groupId>
+ <artifactId>wire-schema-jvm</artifactId>
+ <version>${wire.version}</version>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
new file mode 100644
index 0000000000..71865cad09
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import java.util.Arrays;
+
+/**
+ * Type for proto scalar fields.
+ */
+public enum FieldType {
+ DOUBLE("double"),
+ FLOAT("float"),
+ INT32("int32"),
+ INT64("int64"),
+ UINT32("uint32"),
+ UINT64("uint64"),
+ SINT32("sint32"),
+ SINT64("sint64"),
+ FIXED32("fixed32"),
+ FIXED64("fixed64"),
+ SFIXED32("sfixed32"),
+ SFIXED64("sfixed64"),
+ BOOL("bool"),
+ STRING("string"),
+ BYTES("bytes");
+
+ private final String type;
+
+ FieldType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static FieldType findValue(final String value) {
+ return Arrays.stream(FieldType.values())
+ .filter((type -> type.getType().equalsIgnoreCase(value)))
+ .findFirst()
+ .orElseThrow(() -> new
IllegalArgumentException(String.format("ProtoType [%s] not found", value)));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
new file mode 100644
index 0000000000..85904aca54
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.CoreLoaderKt;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.SchemaLoader;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy;
+import org.apache.nifi.services.protobuf.validation.ProtoValidationResource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"protobuf", "record", "reader", "parser"})
+@CapabilityDescription("Parses a Protocol Buffers message from binary format.")
+public class ProtobufReader extends SchemaRegistryService implements
RecordReaderFactory {
+
+ private static final String ANY_PROTO = "google/protobuf/any.proto";
+ private static final String DURATION_PROTO =
"google/protobuf/duration.proto";
+ private static final String EMPTY_PROTO = "google/protobuf/empty.proto";
+ private static final String STRUCT_PROTO = "google/protobuf/struct.proto";
+ private static final String TIMESTAMP_PROTO =
"google/protobuf/timestamp.proto";
+ private static final String WRAPPERS_PROTO =
"google/protobuf/wrappers.proto";
+
+ private static final AllowableValue GENERATE_FROM_PROTO_FILE = new
AllowableValue("generate-from-proto-file",
+ "Generate from Proto file", "The record schema is generated from
the provided proto file");
+
+ private volatile String messageType;
+ private volatile Schema protoSchema;
+
+ // Holder of cached proto information so validation does not reload the
same proto file over and over
+ private final AtomicReference<ProtoValidationResource>
validationResourceHolder = new AtomicReference<>();
+
+ public static final PropertyDescriptor PROTOBUF_DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("Proto Directory")
+ .displayName("Proto Directory")
+ .description("Directory containing Protocol Buffers message
definition (.proto) file(s).")
+ .required(true)
+
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_TYPE = new
PropertyDescriptor.Builder()
+ .name("Message Type")
+ .displayName("Message Type")
+ .description("Fully qualified name of the Protocol Buffers message
type including its package (eg. mypackage.MyMessage). " +
+ "The .proto files configured in '" +
PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this
message type.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(PROTOBUF_DIRECTORY);
+ properties.add(MESSAGE_TYPE);
+ return properties;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext
validationContext) {
+ final List<ValidationResult> problems = new ArrayList<>();
+ final String protoDirectory =
validationContext.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
+ final String messageType =
validationContext.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
+
+ if (protoDirectory != null && messageType != null) {
+ final Schema protoSchema = getSchemaForValidation(protoDirectory);
+ if (protoSchema.getType(messageType) == null) {
+ problems.add(new ValidationResult.Builder()
+ .subject(MESSAGE_TYPE.getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' message type cannot
be found in the provided proto files.", messageType))
+ .build());
+ }
+ }
+
+ return problems;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final String protoDirectory =
context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
+ messageType =
context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
+ protoSchema = loadProtoSchema(protoDirectory);
+ }
+
+ @Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String
allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext
context) {
+ if
(allowableValue.equalsIgnoreCase(GENERATE_FROM_PROTO_FILE.getValue())) {
+ return new ProtoSchemaStrategy(messageType, protoSchema);
+ }
+
+ return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue,
schemaRegistry, context);
+ }
+
+ @Override
+ protected List<AllowableValue> getSchemaAccessStrategyValues() {
+ final List<AllowableValue> allowableValues = new
ArrayList<>(super.getSchemaAccessStrategyValues());
+ allowableValues.add(GENERATE_FROM_PROTO_FILE);
+ return allowableValues;
+ }
+
+ @Override
+ protected AllowableValue getDefaultSchemaAccessStrategy() {
+ return GENERATE_FROM_PROTO_FILE;
+ }
+
+ @Override
+ public RecordReader createRecordReader(Map<String, String> variables,
InputStream in, long inputLength, ComponentLog logger) throws IOException,
SchemaNotFoundException {
+ return new ProtobufRecordReader(protoSchema, messageType, in,
getSchema(variables, in, null));
+ }
+
+ private Schema loadProtoSchema(final String protoDirectory) {
+ final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
+ schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)),
Collections.emptyList());
+ return schemaLoader.loadSchema();
+ }
+
+ private Schema getSchemaForValidation(final String protoDirectory) {
+ ProtoValidationResource validationResource =
validationResourceHolder.get();
+ if (validationResource == null ||
!protoDirectory.equals(validationResource.getProtoDirectory())) {
+ validationResource = new ProtoValidationResource(protoDirectory,
loadProtoSchema(protoDirectory));
+ validationResourceHolder.set(validationResource);
+ }
+
+ return validationResource.getProtoSchema();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
new file mode 100644
index 0000000000..ab2d6fdbd7
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.protobuf.converter.ProtobufDataConverter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ProtobufRecordReader implements RecordReader {
+
+ private final Schema protoSchema;
+ private final String messageType;
+ private final InputStream inputStream;
+ private RecordSchema recordSchema;
+ private boolean inputProcessed;
+
+ public ProtobufRecordReader(Schema protoSchema, String messageType,
InputStream inputStream, RecordSchema recordSchema) {
+ this.protoSchema = protoSchema;
+ this.messageType = messageType;
+ this.inputStream = inputStream;
+ this.recordSchema = recordSchema;
+ }
+
+ @Override
+ public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields)
throws IOException {
+ if (!inputProcessed) {
+ final ProtobufDataConverter dataConverter = new
ProtobufDataConverter(protoSchema, messageType, recordSchema, coerceTypes,
dropUnknownFields);
+ final Record record = dataConverter.createRecord(inputStream);
+ inputProcessed = true;
+ recordSchema = record.getSchema();
+ return record;
+ }
+
+ return null;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return recordSchema;
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
new file mode 100644
index 0000000000..1481849280
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.converter;
+
+import com.squareup.wire.schema.Field;
+import com.squareup.wire.schema.ProtoType;
+
+public class ProtoField {
+
+ private final String fieldName;
+ private final ProtoType protoType;
+ private final boolean repeatable;
+
+ public ProtoField(Field field) {
+ this(field.getName(), field.getType(), field.isRepeated());
+ }
+
+ public ProtoField(String fieldName, ProtoType protoType) {
+ this(fieldName, protoType, false);
+ }
+
+ private ProtoField(String fieldName, ProtoType protoType, boolean
repeatable) {
+ this.fieldName = fieldName;
+ this.protoType = protoType;
+ this.repeatable = repeatable;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public ProtoType getProtoType() {
+ return protoType;
+ }
+
+ public boolean isRepeatable() {
+ return repeatable;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
new file mode 100644
index 0000000000..8541f5723a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.converter;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import com.squareup.wire.schema.EnumType;
+import com.squareup.wire.schema.Field;
+import com.squareup.wire.schema.MessageType;
+import com.squareup.wire.schema.OneOf;
+import com.squareup.wire.schema.ProtoType;
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.services.protobuf.FieldType;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.google.protobuf.CodedInputStream.decodeZigZag32;
+import static com.google.protobuf.TextFormat.unsignedToString;
+
+/**
+ * The class is responsible for creating Record by mapping the provided proto
schema fields with the list of Unknown fields parsed from encoded proto data.
+ */
+public class ProtobufDataConverter {
+
+ public static final String MAP_KEY_FIELD_NAME = "key";
+ public static final String MAP_VALUE_FIELD_NAME = "value";
+ public static final String ANY_TYPE_URL_FIELD_NAME = "type_url";
+ public static final String ANY_VALUE_FIELD_NAME = "value";
+ public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any";
+
+ private final Schema schema;
+ private final String rootMessageType;
+ private final RecordSchema rootRecordSchema;
+ private final boolean coerceTypes;
+ private final boolean dropUnknownFields;
+
+ private boolean containsAnyField = false;
+
+ public ProtobufDataConverter(Schema schema, String messageType,
RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) {
+ this.schema = schema;
+ this.rootMessageType = messageType;
+ this.rootRecordSchema = recordSchema;
+ this.coerceTypes = coerceTypes;
+ this.dropUnknownFields = dropUnknownFields;
+ }
+
+ /**
+ * Creates a record from the root message.
+ *
+ * @return created record
+ * @throws IOException failed to read input stream
+ */
+ public MapRecord createRecord(InputStream data) throws IOException {
+ final MessageType rootMessageType = (MessageType)
schema.getType(this.rootMessageType);
+ Objects.requireNonNull(rootMessageType, String.format("Message with
name [%s] not found in the provided proto files", this.rootMessageType));
+
+ MapRecord record = createRecord(rootMessageType,
ByteString.readFrom(data), rootRecordSchema);
+ if (containsAnyField) {
+ record.regenerateSchema();
+ }
+
+ return record;
+ }
+
+ /**
+ * Creates a record for the provided message.
+ *
+ * @param messageType message to create a record from
+ * @param data proto message data
+ * @param recordSchema record schema for the created record
+ * @return created record
+ * @throws InvalidProtocolBufferException failed to parse input data
+ */
+ private MapRecord createRecord(MessageType messageType, ByteString data,
RecordSchema recordSchema) throws InvalidProtocolBufferException {
+ final UnknownFieldSet unknownFieldSet =
UnknownFieldSet.parseFrom(data);
+
+ if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) {
+ containsAnyField = true;
+ return handleAnyField(unknownFieldSet);
+ }
+
+ final Map<String, Object> fieldValues =
processMessageFields(messageType, unknownFieldSet);
+ return new MapRecord(recordSchema, fieldValues, false,
dropUnknownFields);
+ }
+
+ /**
+ * Process declared, extension and oneOf fields in the provided message.
+ *
+ * @param messageType message with fields to be processed
+ * @param unknownFieldSet received proto data fields
+ * @return Map of processed fields
+ */
+ private Map<String, Object> processMessageFields(MessageType messageType,
UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException {
+ Map<String, Object> recordValues = new HashMap<>();
+
+ for (final Field field : messageType.getDeclaredFields()) {
+ collectFieldValue(recordValues, new ProtoField(field),
unknownFieldSet.getField(field.getTag()));
+ }
+
+ for (final Field field : messageType.getExtensionFields()) {
+ collectFieldValue(recordValues, new ProtoField(field),
unknownFieldSet.getField(field.getTag()));
+ }
+
+ for (final OneOf oneOf : messageType.getOneOfs()) {
+ for (Field field : oneOf.getFields()) {
+ collectFieldValue(recordValues, new ProtoField(field),
unknownFieldSet.getField(field.getTag()));
+ }
+ }
+ return recordValues;
+ }
+
+ /**
+ * Checks the field value's presence and sets it into the result Map.
+ *
+ * @param fieldNameToConvertedValue Map of converter values
+ * @param protoField proto field's properties
+ * @param unknownField field's value
+ */
+ private void collectFieldValue(Map<String, Object>
fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field
unknownField) throws InvalidProtocolBufferException {
+ final Optional<Object> fieldValue = convertFieldValues(protoField,
unknownField);
+ fieldValue.ifPresent(value ->
fieldNameToConvertedValue.put(protoField.getFieldName(), value));
+ }
+
+ private Optional<Object> convertFieldValues(ProtoField protoField,
UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException {
+ if (!unknownField.getLengthDelimitedList().isEmpty()) {
+ return Optional.of(convertLengthDelimitedFields(protoField,
unknownField.getLengthDelimitedList()));
+ }
+ if (!unknownField.getFixed32List().isEmpty()) {
+ return Optional.of(convertFixed32Fields(protoField,
unknownField.getFixed32List()));
+ }
+ if (!unknownField.getFixed64List().isEmpty()) {
+ return Optional.of(convertFixed64Fields(protoField,
unknownField.getFixed64List()));
+ }
+ if (!unknownField.getVarintList().isEmpty()) {
+ return Optional.of(convertVarintFields(protoField,
unknownField.getVarintList()));
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Converts a Length-Delimited field value into it's suitable data type.
+ *
+ * @param protoField proto field's properties
+ * @param values field's unprocessed values
+ * @return converted field values
+ * @throws InvalidProtocolBufferException failed to parse input data
+ */
+ private Object convertLengthDelimitedFields(ProtoField protoField,
List<ByteString> values) throws InvalidProtocolBufferException {
+ final ProtoType protoType = protoField.getProtoType();
+ final Function<ByteString, Object> valueConverter;
+ if (protoType.isScalar()) {
+ switch (FieldType.findValue(protoType.getSimpleName())) {
+ case STRING:
+ valueConverter = ByteString::toStringUtf8;
+ break;
+ case BYTES:
+ valueConverter = ByteString::toByteArray;
+ break;
+ default:
+ throw new
IllegalStateException(String.format("Incompatible value was received for field
[%s]," +
+ " [%s] is not LengthDelimited field type",
protoField.getFieldName(), protoType.getSimpleName()));
+ }
+ } else if (protoType.isMap()) {
+ return createMap(protoType, values);
+ } else {
+ final MessageType messageType = (MessageType)
schema.getType(protoType);
+ Objects.requireNonNull(messageType, String.format("Message type
with name [%s] not found in the provided proto files", protoType));
+
+ valueConverter = value -> {
+ try {
+ Optional<DataType> recordDataType =
rootRecordSchema.getDataType(protoField.getFieldName());
+ RecordSchema recordSchema = recordDataType.map(dataType ->
+ ((RecordDataType)
dataType).getChildSchema()).orElse(generateRecordSchema(messageType.getType().toString()));
+ return createRecord(messageType, value, recordSchema);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Failed to create record
from the provided input data for field " + protoField.getFieldName(), e);
+ }
+ };
+ }
+
+ return resolveFieldValue(protoField, values, valueConverter);
+ }
+
+ /**
+ * Converts a Fixed32 field value into it's suitable data type.
+ *
+ * @param protoField proto field's properties
+ * @param values field's unprocessed values
+ * @return converted field values
+ */
+ private Object convertFixed32Fields(ProtoField protoField, List<Integer>
values) {
+ final String typeName = protoField.getProtoType().getSimpleName();
+ Function<Integer, Object> valueConverter;
+ switch (FieldType.findValue(typeName)) {
+ case FIXED32:
+ valueConverter = value ->
Long.parseLong(unsignedToString(value));
+ break;
+ case SFIXED32:
+ valueConverter = value -> value;
+ break;
+ case FLOAT:
+ valueConverter = Float::intBitsToFloat;
+ break;
+ default:
+ throw new IllegalStateException(String.format("Incompatible
value was received for field [%s]," +
+ " [%s] is not Fixed32 field type",
protoField.getFieldName(), typeName));
+ }
+
+ return resolveFieldValue(protoField, values, valueConverter);
+ }
+
+ /**
+ * Converts a Fixed64 field value into it's suitable data type.
+ *
+ * @param protoField proto field's properties
+ * @param values field's unprocessed values
+ * @return converted field values
+ */
+ private Object convertFixed64Fields(ProtoField protoField, List<Long>
values) {
+ final String typeName = protoField.getProtoType().getSimpleName();
+ Function<Long, Object> valueConverter;
+ switch (FieldType.findValue(typeName)) {
+ case FIXED64:
+ valueConverter = value -> new
BigInteger(unsignedToString(value));
+ break;
+ case SFIXED64:
+ valueConverter = value -> value;
+ break;
+ case DOUBLE:
+ valueConverter = Double::longBitsToDouble;
+ break;
+ default:
+ throw new IllegalStateException(String.format("Incompatible
value was received for field [%s]," +
+ " [%s] is not Fixed64 field type",
protoField.getFieldName(), typeName));
+ }
+
+ return resolveFieldValue(protoField, values, valueConverter);
+ }
+
+ /**
+ * Converts a Varint field value into it's suitable data type.
+ *
+ * @param protoField proto field's properties
+ * @param values field's unprocessed values
+ * @return converted field values
+ */
+ private Object convertVarintFields(ProtoField protoField, List<Long>
values) {
+ final ProtoType protoType = protoField.getProtoType();
+ final Function<Long, Object> valueConverter;
+ if (protoType.isScalar()) {
+ switch (FieldType.findValue(protoType.getSimpleName())) {
+ case BOOL:
+ valueConverter = value -> value.equals(1L);
+ break;
+ case INT32:
+ case SFIXED32:
+ valueConverter = Long::intValue;
+ break;
+ case UINT32:
+ case INT64:
+ case SFIXED64:
+ valueConverter = value -> value;
+ break;
+ case UINT64:
+ valueConverter = value -> new
BigInteger(unsignedToString(value));
+ break;
+ case SINT32:
+ valueConverter = value -> decodeZigZag32(value.intValue());
+ break;
+ case SINT64:
+ valueConverter = CodedInputStream::decodeZigZag64;
+ break;
+ default:
+ throw new
IllegalStateException(String.format("Incompatible value was received for field
[%s]," +
+ " [%s] is not Varint field type",
protoField.getFieldName(), protoType.getSimpleName()));
+ }
+ } else {
+ valueConverter = value -> {
+ final EnumType enumType = (EnumType) schema.getType(protoType);
+ Objects.requireNonNull(enumType, String.format("Enum with name
[%s] not found in the provided proto files", protoType));
+ return
enumType.constant(Integer.parseInt(value.toString())).getName();
+ };
+ }
+
+ return resolveFieldValue(protoField, values, valueConverter);
+ }
+
+ private <T> Object resolveFieldValue(ProtoField protoField, List<T>
values, Function<T, Object> valueConverter) {
+ List<Object> resultValues =
values.stream().map(valueConverter).collect(Collectors.toList());
+
+ if (coerceTypes) {
+ final Optional<RecordField> recordField =
rootRecordSchema.getField(protoField.getFieldName());
+ if (recordField.isPresent()) {
+ resultValues = resultValues.stream().map(value ->
DataTypeUtils.convertType(value, recordField.get().getDataType(),
recordField.get().getFieldName())).collect(Collectors.toList());
+ }
+ }
+
+ if (!protoField.isRepeatable()) {
+ return resultValues.get(0);
+ } else {
+ return resultValues.toArray();
+ }
+ }
+
+ /**
+ * Handles Map type creation in the record.
+ *
+ * @param protoType field's proto type
+ * @param data data to be processed
+ * @return created Map
+ * @throws InvalidProtocolBufferException failed to parse input data
+ */
+ private Map<String, Object> createMap(ProtoType protoType,
List<ByteString> data) throws InvalidProtocolBufferException {
+ Map<String, Object> mapResult = new HashMap<>();
+
+ for (final ByteString entry : data) {
+ final UnknownFieldSet unknownFieldSet =
UnknownFieldSet.parseFrom(entry);
+ Map<String, Object> mapEntry = new HashMap<>();
+
+ collectFieldValue(mapEntry, new ProtoField(MAP_KEY_FIELD_NAME,
protoType.getKeyType()), unknownFieldSet.getField(1));
+ collectFieldValue(mapEntry, new ProtoField(MAP_VALUE_FIELD_NAME,
protoType.getValueType()), unknownFieldSet.getField(2));
+
+ mapResult.put(String.valueOf(mapEntry.get(MAP_KEY_FIELD_NAME)),
mapEntry.get(MAP_VALUE_FIELD_NAME));
+ }
+
+ return mapResult;
+ }
+
+ /**
+ * Process a 'google.protobuf.Any' typed field. The method gets the schema
for the message type provided in the 'type_url' property
+ * and parse the serialized message from the 'value' field. The result
record will contain only the parsed message's fields.
+ *
+ * @param unknownFieldSet 'google.protobuf.Any' typed message's field list
+ * @return created record from the parsed message
+ * @throws InvalidProtocolBufferException failed to parse input data
+ */
+ private MapRecord handleAnyField(UnknownFieldSet unknownFieldSet) throws
InvalidProtocolBufferException {
+ Map<String, Object> recordValues = new HashMap<>();
+ collectFieldValue(recordValues, new
ProtoField(ANY_TYPE_URL_FIELD_NAME, ProtoType.STRING),
unknownFieldSet.getField(1));
+ collectFieldValue(recordValues, new ProtoField(ANY_VALUE_FIELD_NAME,
ProtoType.BYTES), unknownFieldSet.getField(2));
+
+ final String typeName =
String.valueOf(recordValues.get(ANY_TYPE_URL_FIELD_NAME));
+ final UnknownFieldSet anyFieldSet = UnknownFieldSet.parseFrom((byte[])
recordValues.get(ANY_VALUE_FIELD_NAME));
+ final MessageType messageType = (MessageType)
schema.getType(getQualifiedTypeName(typeName));
+ Objects.requireNonNull(messageType, String.format("Message type with
name [%s] not found in the provided proto files", typeName));
+
+ return new MapRecord(generateRecordSchema(typeName),
processMessageFields(messageType, anyFieldSet), false, dropUnknownFields);
+ }
+
+ /**
+ * Generates a schema for the provided message type
+ *
+ * @param typeName name of the message
+ * @return generated schema
+ */
+ private RecordSchema generateRecordSchema(String typeName) {
+ final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
+ return schemaParser.createSchema(getQualifiedTypeName(typeName));
+ }
+
+ /**
+ * Gets the fully qualified name of the message type.
+ *
+ * @param typeName name of the message
+ * @return fully qualified name of the message type
+ */
+ private String getQualifiedTypeName(String typeName) {
+ return typeName.substring(typeName.lastIndexOf('/') + 1);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java
new file mode 100644
index 0000000000..8d9e997b6c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.schema;
+
+import com.squareup.wire.schema.EnumConstant;
+import com.squareup.wire.schema.EnumType;
+import com.squareup.wire.schema.Field;
+import com.squareup.wire.schema.MessageType;
+import com.squareup.wire.schema.OneOf;
+import com.squareup.wire.schema.ProtoType;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.Type;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.EnumDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.services.protobuf.FieldType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a {@link RecordSchema} for the provided proto schema.
+ */
+public class ProtoSchemaParser {
+
+ private final Schema schema;
+
+ public ProtoSchemaParser(Schema schema) {
+ this.schema = schema;
+ }
+
+ /**
+ * Creates a {@link RecordSchema} for the provided message type.
+ * @param messageTypeName proto message type
+ * @return record schema
+ */
+ public RecordSchema createSchema(String messageTypeName) {
+ final MessageType messageType = (MessageType)
schema.getType(messageTypeName);
+ Objects.requireNonNull(messageType, String.format("Message type with
name [%s] not found in the provided proto files", messageTypeName));
+ List<RecordField> recordFields = new ArrayList<>();
+
+ recordFields.addAll(processFields(messageType.getDeclaredFields()));
+ recordFields.addAll(processFields(messageType.getExtensionFields()));
+ recordFields.addAll(processOneOfFields(messageType));
+
+ return new SimpleRecordSchema(recordFields);
+ }
+
+ /**
+ * Iterates through and process OneOf fields in the given message type.
+ * @param messageType message type
+ * @return generated {@link RecordSchema} list from the OneOf fields
+ */
+ private List<RecordField> processOneOfFields(MessageType messageType) {
+ List<RecordField> recordFields = new ArrayList<>();
+ for (final OneOf oneOf : messageType.getOneOfs()) {
+
+ for (Field field : oneOf.getFields()) {
+ final DataType dataType = getDataTypeForField(field.getType());
+ recordFields.add(new RecordField(field.getName(), dataType,
field.getDefault(), true));
+ }
+ }
+
+ return recordFields;
+ }
+
+ /**
+ * Iterates through and process fields in the given message type.
+ * @return generated {@link RecordSchema} list from the provided fields
+ */
+ private List<RecordField> processFields(List<Field> fields) {
+ List<RecordField> recordFields = new ArrayList<>();
+ for (final Field field : fields) {
+ DataType dataType = getDataTypeForField(field.getType());
+
+ if (field.isRepeated()) {
+ dataType = RecordFieldType.ARRAY.getArrayDataType(dataType);
+ }
+
+ recordFields.add(new RecordField(field.getName(), dataType,
field.getDefault(), !field.isRequired()));
+ }
+
+ return recordFields;
+ }
+
+ /**
+ * Checks the provided field's type and calls the proper {@link DataType}
processing function.
+ * @param protoType field's type
+ * @return data type
+ */
+ private DataType getDataTypeForField(ProtoType protoType) {
+ if (protoType.isScalar()) {
+ return getDataTypeForScalarField(protoType);
+ } else {
+ return getDataTypeForCompositeField(protoType);
+ }
+ }
+
+ /**
+ * Gets the suitable {@link DataType} for the provided composite field.
+ * @param protoType field's type
+ * @return data type
+ */
+ private DataType getDataTypeForCompositeField(ProtoType protoType) {
+ if (protoType.isMap()) {
+ final DataType valueType =
getDataTypeForField(protoType.getValueType());
+ return new MapDataType(valueType);
+ }
+
+ final Type fieldType = schema.getType(protoType);
+
+ if (fieldType instanceof MessageType) {
+ final RecordSchema recordSchema =
createSchema(protoType.toString());
+ return new RecordDataType(recordSchema);
+ } else if (fieldType instanceof EnumType) {
+ return new EnumDataType(((EnumType)
fieldType).getConstants().stream().map(EnumConstant::getName).collect(Collectors.toList()));
+ } else {
+ throw new IllegalStateException("Unknown proto type: " +
fieldType);
+ }
+ }
+
+ /**
+ * Gets the suitable {@link DataType} for the provided scalar field.
+ * @param protoType field's type
+ * @return data type
+ */
+ private DataType getDataTypeForScalarField(ProtoType protoType) {
+ switch (FieldType.findValue(protoType.getSimpleName())) {
+ case DOUBLE:
+ return RecordFieldType.DOUBLE.getDataType();
+ case FLOAT:
+ return RecordFieldType.FLOAT.getDataType();
+ case INT32:
+ case SFIXED32:
+ return RecordFieldType.INT.getDataType();
+ case UINT32:
+ case SINT32:
+ case FIXED32:
+ case INT64:
+ case SINT64:
+ case SFIXED64:
+ return RecordFieldType.LONG.getDataType();
+ case UINT64:
+ case FIXED64:
+ return RecordFieldType.BIGINT.getDataType();
+ case BOOL:
+ return RecordFieldType.BOOLEAN.getDataType();
+ case STRING:
+ return RecordFieldType.STRING.getDataType();
+ case BYTES:
+ return
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java
new file mode 100644
index 0000000000..d6977847e8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.schema;
+
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ProtoSchemaStrategy implements SchemaAccessStrategy {
+
+ private final String messageType;
+ private final Schema schema;
+
+ public ProtoSchemaStrategy(String messageType, Schema schema) {
+ this.messageType = messageType;
+ this.schema = schema;
+ }
+
+ @Override
+ public RecordSchema getSchema(Map<String, String> variables, InputStream
contentStream, RecordSchema readSchema) {
+ final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
+ return schemaParser.createSchema(messageType);
+ }
+
+ @Override
+ public Set<SchemaField> getSuppliedSchemaFields() {
+ return EnumSet.noneOf(SchemaField.class);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java
new file mode 100644
index 0000000000..35cce3bf51
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.validation;
+
+import com.squareup.wire.schema.Schema;
+
+public class ProtoValidationResource {
+
+ private final String protoDirectory;
+ private final Schema protoSchema;
+
+ public ProtoValidationResource(String protoDirectory, Schema protoSchema) {
+ this.protoDirectory = protoDirectory;
+ this.protoSchema = protoSchema;
+ }
+
+ public String getProtoDirectory() {
+ return protoDirectory;
+ }
+
+ public Schema getProtoSchema() {
+ return protoSchema;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..44ded1008f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.services.protobuf.ProtobufReader
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html
new file mode 100644
index 0000000000..0228875ffc
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html
@@ -0,0 +1,189 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8"/>
+ <title>ProtobufReader</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css"
type="text/css"/>
+ </head>
+
+ <body>
+ <p>
+ The ProtobufReader Controller Service reads and parses a Protocol
Buffers Message from binary format and creates a Record object.
+ The Controller Service must be configured with the same '.proto'
file that was used for the Message encoding, and the fully qualified
+ name of the Message type including its package (e.g.
mypackage.MyMessage). The Reader will always generate one record from the input
+ data which represents the provided Protocol Buffers Message type.
+ Further information about Protocol Buffers can be found here:
+ <a href="https://protobuf.dev/">protobuf.dev</a>
+ </p>
+
+ <h2>Data Type Mapping</h2>
+
+ <p>
+ When a record is parsed from incoming data, the Controller Service
is going to map the Proto Message field types to the corresponding
+ NiFi data types. The mapping between the provided Message fields
and the encoded input is always based on the field tag numbers.
+ When a field is defined as 'repeated' then it's data type will be
an array with data type of it's originally specified type.
+ The following tables show which proto field type will correspond
to which NiFi field type after the conversion.
+ </p>
+
+ <h3>
+ Scalar Value Types
+ </h3>
+
+ <p>
+ <table>
+ <tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data
Type</th></tr>
+ <tr><td>double</td><td>fixed64</td><td>double</td></tr>
+ <tr><td>float</td><td>fixed32</td><td>float</td></tr>
+ <tr><td>int32</td><td>varint</td><td>int</td></tr>
+ <tr><td>int64</td><td>varint</td><td>long</td></tr>
+ <tr><td>uint32</td><td>varint</td><td>long</td></tr>
+ <tr><td>uint64</td><td>varint</td><td>bigint</td></tr>
+ <tr><td>sint32</td><td>varint</td><td>long</td></tr>
+ <tr><td>sint64</td><td>varint</td><td>long</td></tr>
+ <tr><td>fixed32</td><td>fixed32</td><td>long</td></tr>
+ <tr><td>fixed64</td><td>fixed64</td><td>bigint</td></tr>
+ <tr><td>sfixed32</td><td>varint</td><td>int</td></tr>
+ <tr><td>sfixed64</td><td>varint</td><td>long</td></tr>
+ <tr><td>bool</td><td>varint</td><td>boolean</td></tr>
+ <tr><td>string</td><td>length
delimited</td><td>string</td></tr>
+ <tr><td>bytes</td><td>length
delimited</td><td>array[byte]</td></tr>
+ </table>
+ </p>
+
+ <h3>
+ Composite Value Types
+ </h3>
+
+ <p>
+ <table>
+ <tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data
Type</th></tr>
+ <tr><td>message</td><td>length
delimited</td><td>record</td></tr>
+ <tr><td>enum</td><td>varint</td><td>enum</td></tr>
+ <tr><td>map</td><td>length delimited</td><td>map</td></tr>
+ <tr><td>oneof</td><td>-</td><td>choice</td></tr>
+ </table>
+ </p>
+
+ <h2>Schemas and Type Coercion</h2>
+
+ <p>
+ When a record is parsed from incoming data, it is separated into
fields. Each of these fields is then looked up against the
+ configured schema (by field name) in order to determine what the
type of the data should be. If the field is not present in
+ the schema, that field will be stored in the Record's value list
on its original type. If the field is found in the schema,
+ the data type of the received data is compared against the data
type specified in the schema. If the types match, the value
+ of that field is used as-is. If the schema indicates that the
field should be of a different type, then the Controller Service
+ will attempt to coerce the data into the type specified by the
schema. If the field cannot be coerced into the specified type,
+ an Exception will be thrown.
+ </p>
+
+ <p>
+ The following rules apply when attempting to coerce a field value
from one data type to another:
+ </p>
+
+ <ul>
+ <li>Any data type can be coerced into a String type.</li>
+ <li>Any numeric data type (Int, Long, Float, Double) can be
coerced into any other numeric data type.</li>
+ <li>Any numeric value can be coerced into a Date, Time, or
Timestamp type, by assuming that the Long value is the number of
+ milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+ <li>A String value can be coerced into a Date, Time, or Timestamp
type, if its format matches the configured "Date Format," "Time Format,"
+ or "Timestamp Format."</li>
+ <li>A String value can be coerced into a numeric value if the
value is of the appropriate type. For example, the String value
+ <code>8</code> can be coerced into any numeric type. However,
the String value <code>8.2</code> can be coerced into a Double or Float
+ type but not an Integer.</li>
+ <li>A String value of "true" or "false" (regardless of case) can
be coerced into a Boolean value.</li>
+ <li>A String value that is not empty can be coerced into a Char
type. If the String contains more than 1 character, the first character is used
+ and the rest of the characters are ignored.</li>
+ </ul>
+
+ <p>
+ If none of the above rules apply when attempting to coerce a value
from one data type to another, the coercion will fail and an Exception
+ will be thrown.
+ </p>
+
+ <h2>Schema Access Strategy</h2>
+
+ <p>
+ Beside the common Schema Access strategies like getting the schema
from property value or accessing it from Schema Registry,
+ the ProtobufReader Controller Service offers another access
strategy option called "Generate from Proto file". When using this strategy,
+ the Reader will generate the Record Schema from the provided
'.proto' file and Message type. This is a recommended strategy when the user
+ doesn't want to manually create the schema or when no type
coercion is needed.
+ </p>
+
+
+ <h2>Protobuf Any Field Type</h2>
+
+ <p>
+ Protocol Buffers offers further Message types called Well-Known
Types. These are additionally provided messages that defines
+ complex structured types and wrappers for scalar types. The Any
type is one of these Well-Known Types which is used to store an arbitrary
+ serialized Message along with an URL that describes the type of
the serialized Message. Since the Message type and the embedded Message will be
+ available only when the Any Message is already populated with
data, the ProtobufReader needs to do this Message processing at data conversion
time.
+ The Reader is capable to generate schema for the embedded Message
in the Any field and replace it in the result Record schema.
+ </p>
+
+ <h3>Example</h3>
+
+ <p>
+ There is a Message called 'TestMessage' which has only one field
that is an Any typed field. There is another Message called 'NestedMessage'
+ that we would like to add as serialized Message in the value of
'anyField'.
+ </p>
+
+<code><pre>
+message Any {
+ string type_url = 1;
+ bytes value = 2;
+}
+
+message TestMessage {
+ google.protobuf.Any anyField = 3;
+}
+
+message NestedMessage {
+ string field_1 = 1;
+ string field_2 = 2;
+ string field_3 = 3;
+}
+</pre></code>
+
+ <p>
+ With normal data conversion our result would look like this:
+ </p>
+
+<code><pre>
+{
+ anyField : {
+ type_url : "type.googleapis.com/NestedMessage"
+ value : [84, 101, 115, 116, 32, 98, 121, 116, 101, 115]
+ }
+}
+</pre></code>
+
+ <p>
+ Result after the Protobuf Reader replaces the Any Message's fields
with the processed embedded Message:
+ </p>
+
+<code><pre>
+{
+ anyField : {
+ field_1 : "value 1",
+ field_2 : "value 2",
+ field_3 : "value 3"
+ }
+}
+</pre></code>
+
+ </body>
+</html>
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
new file mode 100644
index 0000000000..4a10c0ecfd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.squareup.wire.schema.CoreLoaderKt;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.SchemaLoader;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_KEY_FIELD_NAME;
+import static
org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_VALUE_FIELD_NAME;
+
+public class ProtoTestUtil {
+
+ public static final String BASE_TEST_PATH = "src/test/resources/";
+
+ public static Schema loadProto3TestSchema() {
+ final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
+
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH +
"test_proto3.proto")), Collections.emptyList());
+ return schemaLoader.loadSchema();
+ }
+
+ public static Schema loadProto2TestSchema() {
+ final SchemaLoader schemaLoader = new
SchemaLoader(FileSystems.getDefault());
+ schemaLoader.initRoots(Arrays.asList(
+ Location.get(BASE_TEST_PATH, "test_proto2.proto"),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR,
"google/protobuf/any.proto")), Collections.emptyList());
+ return schemaLoader.loadSchema();
+ }
+
+ public static InputStream generateInputDataForProto3() throws IOException,
Descriptors.DescriptorValidationException {
+ DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "test_proto3.desc"));
+ Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new
Descriptors.FileDescriptor[0]);
+
+ Descriptors.Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto3Message");
+ Descriptors.Descriptor nestedMessageDescriptor =
fileDescriptor.findMessageTypeByName("NestedMessage");
+ Descriptors.EnumDescriptor enumValueDescriptor =
fileDescriptor.findEnumTypeByName("TestEnum");
+ Descriptors.Descriptor mapDescriptor =
nestedMessageDescriptor.findNestedTypeByName("TestMapEntry");
+
+ DynamicMessage mapEntry1 = DynamicMessage
+ .newBuilder(mapDescriptor)
+ .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME),
"test_key_entry1")
+ .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME),
101)
+ .build();
+
+ DynamicMessage mapEntry2 = DynamicMessage
+ .newBuilder(mapDescriptor)
+ .setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME),
"test_key_entry2")
+ .setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME),
202)
+ .build();
+
+ DynamicMessage nestedMessage = DynamicMessage
+ .newBuilder(nestedMessageDescriptor)
+ .setField(nestedMessageDescriptor.findFieldByNumber(20),
enumValueDescriptor.findValueByNumber(2))
+
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 1")
+
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 2")
+
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 3")
+ .setField(nestedMessageDescriptor.findFieldByNumber(22), "One
Of Option")
+ .setField(nestedMessageDescriptor.findFieldByNumber(23), true)
+ .setField(nestedMessageDescriptor.findFieldByNumber(24), 3)
+ .setField(nestedMessageDescriptor.findFieldByNumber(25),
Arrays.asList(mapEntry1, mapEntry2))
+ .build();
+
+ DynamicMessage message = DynamicMessage
+ .newBuilder(messageDescriptor)
+ .setField(messageDescriptor.findFieldByNumber(1), true)
+ .setField(messageDescriptor.findFieldByNumber(2), "Test text")
+ .setField(messageDescriptor.findFieldByNumber(3),
Integer.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(4), -1)
+ .setField(messageDescriptor.findFieldByNumber(5),
Integer.MIN_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(6), -2)
+ .setField(messageDescriptor.findFieldByNumber(7),
Integer.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(8),
Double.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(9),
Float.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(10), "Test
bytes".getBytes())
+ .setField(messageDescriptor.findFieldByNumber(11),
Long.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(12), -1L)
+ .setField(messageDescriptor.findFieldByNumber(13),
Long.MIN_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(14), -2L)
+ .setField(messageDescriptor.findFieldByNumber(15),
Long.MAX_VALUE)
+ .setField(messageDescriptor.findFieldByNumber(16),
nestedMessage)
+ .build();
+
+ return message.toByteString().newInput();
+ }
+
+ public static InputStream generateInputDataForProto2() throws IOException,
Descriptors.DescriptorValidationException {
+ DescriptorProtos.FileDescriptorSet anyDescriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "google/protobuf/any.desc"));
+ Descriptors.FileDescriptor anyDesc =
Descriptors.FileDescriptor.buildFrom(anyDescriptorSet.getFile(0), new
Descriptors.FileDescriptor[]{});
+
+ DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH
+ "test_proto2.desc"));
+ Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new
Descriptors.FileDescriptor[]{anyDesc});
+
+ Descriptors.Descriptor messageDescriptor =
fileDescriptor.findMessageTypeByName("Proto2Message");
+ Descriptors.Descriptor anyTestDescriptor =
fileDescriptor.findMessageTypeByName("AnyValueMessage");
+ Descriptors.FieldDescriptor fieldDescriptor =
fileDescriptor.findExtensionByName("extensionField");
+ Descriptors.Descriptor anyDescriptor =
anyDesc.findMessageTypeByName("Any");
+
+ DynamicMessage anyTestMessage = DynamicMessage
+ .newBuilder(anyTestDescriptor)
+ .setField(anyTestDescriptor.findFieldByNumber(1), "Test field
1")
+ .setField(anyTestDescriptor.findFieldByNumber(2), "Test field
2")
+ .build();
+
+ DynamicMessage anyMessage = DynamicMessage
+ .newBuilder(anyDescriptor)
+ .setField(anyDescriptor.findFieldByNumber(1),
"type.googleapis.com/AnyValueMessage")
+ .setField(anyDescriptor.findFieldByNumber(2),
anyTestMessage.toByteArray())
+ .build();
+
+ DynamicMessage message = DynamicMessage
+ .newBuilder(messageDescriptor)
+ .setField(messageDescriptor.findFieldByNumber(1), true)
+ .setField(messageDescriptor.findFieldByNumber(3), anyMessage)
+ .setField(fieldDescriptor, Integer.MAX_VALUE)
+ .build();
+
+ return message.toByteString().newInput();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java
new file mode 100644
index 0000000000..cd433015de
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.google.protobuf.Descriptors;
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class TestProtobufRecordReader {
+
+ private static Schema protoSchema;
+
+ @BeforeAll
+ public static void setup(){
+ protoSchema = loadProto3TestSchema();
+ }
+
+ @Test
+ public void testReadRecord() throws
Descriptors.DescriptorValidationException, IOException {
+ final ProtobufRecordReader reader =
createReader(generateInputDataForProto3(), "Proto3Message", protoSchema,
generateRecordSchema());
+ final Record record = reader.nextRecord(false, false);
+
+ final Object field1 = record.getValue("booleanField");
+ assertEquals(true, field1);
+ assertInstanceOf(Boolean.class, field1);
+
+ final Object field2 = record.getValue("stringField");
+ assertEquals("Test text", field2);
+ assertInstanceOf(String.class, field2);
+
+ final Object field3 = record.getValue("int32Field");
+ assertEquals(Integer.MAX_VALUE, field3);
+ assertInstanceOf(Integer.class, field3);
+
+ final Object field4 = record.getValue("uint32Field");
+ assertNotNull(field4);
+ }
+
+ @Test
+ public void testReadRecordWithCoerceType() throws
Descriptors.DescriptorValidationException, IOException {
+ final ProtobufRecordReader reader =
createReader(generateInputDataForProto3(), "Proto3Message", protoSchema,
generateRecordSchema());
+ final Record record = reader.nextRecord(true, false);
+
+ final Object field1 = record.getValue("booleanField");
+ assertEquals("true", field1);
+ assertInstanceOf(String.class, field1);
+
+ final Object field2 = record.getValue("stringField");
+ assertEquals("Test text", field2);
+ assertInstanceOf(String.class, field2);
+
+ final Object field3 = record.getValue("int32Field");
+ assertEquals(String.valueOf(Integer.MAX_VALUE), field3);
+ assertInstanceOf(String.class, field3);
+
+ final Object field4 = record.getValue("uint32Field");
+ assertNotNull(field4);
+ }
+
+ @Test
+ public void testReadRecordWithDropUnknownFields() throws
Descriptors.DescriptorValidationException, IOException {
+ final ProtobufRecordReader reader =
createReader(generateInputDataForProto3(), "Proto3Message", protoSchema,
generateRecordSchema());
+ final Record record = reader.nextRecord(false, true);
+
+ final Object field1 = record.getValue("booleanField");
+ assertEquals(true, field1);
+ assertInstanceOf(Boolean.class, field1);
+
+ final Object field2 = record.getValue("stringField");
+ assertEquals("Test text", field2);
+ assertInstanceOf(String.class, field2);
+
+ final Object field3 = record.getValue("int32Field");
+ assertEquals(Integer.MAX_VALUE, field3);
+ assertInstanceOf(Integer.class, field3);
+
+ final Object field4 = record.getValue("uint32Field");
+ assertNull(field4);
+ }
+
+ @Test
+ public void testReadRecordWithCoerceTypeAndDropUnknownFields() throws
Descriptors.DescriptorValidationException, IOException {
+ final ProtobufRecordReader reader =
createReader(generateInputDataForProto3(), "Proto3Message", protoSchema,
generateRecordSchema());
+ final Record record = reader.nextRecord(true, true);
+
+ final Object field1 = record.getValue("booleanField");
+ assertEquals("true", field1);
+ assertInstanceOf(String.class, field1);
+
+ final Object field2 = record.getValue("stringField");
+ assertEquals("Test text", field2);
+ assertInstanceOf(String.class, field2);
+
+ final Object field3 = record.getValue("int32Field");
+ assertEquals(String.valueOf(Integer.MAX_VALUE), field3);
+ assertInstanceOf(String.class, field3);
+
+ final Object field4 = record.getValue("uint32Field");
+ assertNull(field4);
+ }
+
+ private RecordSchema generateRecordSchema() {
+ final List<RecordField> fields = new ArrayList<>();
+ for (final String fieldName : new String[] {"booleanField",
"stringField", "int32Field"}) {
+ fields.add(new RecordField(fieldName,
RecordFieldType.STRING.getDataType()));
+ }
+ return new SimpleRecordSchema(fields);
+ }
+
+ private ProtobufRecordReader createReader(InputStream in, String message,
Schema schema, RecordSchema recordSchema) {
+ return new ProtobufRecordReader(schema, message, in, recordSchema);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
new file mode 100644
index 0000000000..df811f506b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.converter;
+
+import com.google.protobuf.Descriptors;
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.services.protobuf.ProtoTestUtil;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestProtobufDataConverter {
+
+ @Test
+ public void testDataConverterForProto3() throws
Descriptors.DescriptorValidationException, IOException {
+ final Schema schema = loadProto3TestSchema();
+ final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("Proto3Message");
+
+ final ProtobufDataConverter dataConverter = new
ProtobufDataConverter(schema, "Proto3Message", recordSchema, false, false);
+ final MapRecord record =
dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto3());
+
+ assertEquals(true, record.getValue("booleanField"));
+ assertEquals("Test text", record.getValue("stringField"));
+ assertEquals(Integer.MAX_VALUE, record.getValue("int32Field"));
+ assertEquals(4294967295L, record.getValue("uint32Field"));
+ assertEquals(Integer.MIN_VALUE, record.getValue("sint32Field"));
+ assertEquals(4294967294L, record.getValue("fixed32Field"));
+ assertEquals(Integer.MAX_VALUE, record.getValue("sfixed32Field"));
+ assertEquals(Double.MAX_VALUE, record.getValue("doubleField"));
+ assertEquals(Float.MAX_VALUE, record.getValue("floatField"));
+ assertArrayEquals("Test bytes".getBytes(), (byte[])
record.getValue("bytesField"));
+ assertEquals(Long.MAX_VALUE, record.getValue("int64Field"));
+ assertEquals(new BigInteger("18446744073709551615"),
DataTypeUtils.toBigInt(record.getValue("uint64Field"), "field12"));
+ assertEquals(Long.MIN_VALUE, record.getValue("sint64Field"));
+ assertEquals(new BigInteger("18446744073709551614"),
DataTypeUtils.toBigInt(record.getValue("fixed64Field"), "field14"));
+ assertEquals(Long.MAX_VALUE, record.getValue("sfixed64Field"));
+
+ final MapRecord nestedRecord = (MapRecord)
record.getValue("nestedMessage");
+ assertEquals("ENUM_VALUE_3", nestedRecord.getValue("testEnum"));
+
+ assertArrayEquals(new Object[]{"Repeated 1", "Repeated 2", "Repeated
3"}, (Object[]) nestedRecord.getValue("repeatedField"));
+
+ // assert only one field is set in the OneOf field
+ assertNull(nestedRecord.getValue("stringOption"));
+ assertNull(nestedRecord.getValue("booleanOption"));
+ assertEquals(3, nestedRecord.getValue("int32Option"));
+
+ final Map<String, Integer> expectedMap = new HashMap<String,
Integer>() {{
+ put("test_key_entry1", 101);
+ put("test_key_entry2", 202);
+ }};
+ assertEquals(expectedMap, nestedRecord.getValue("testMap"));
+ }
+
+ @Test
+ public void testDataConverterForProto2() throws
Descriptors.DescriptorValidationException, IOException {
+ final Schema schema = loadProto2TestSchema();
+ final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("Proto2Message");
+
+ final ProtobufDataConverter dataConverter = new
ProtobufDataConverter(schema, "Proto2Message", recordSchema, false, false);
+ final MapRecord record =
dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2());
+
+ assertEquals(true, record.getValue("booleanField"));
+ assertEquals("Missing field", record.getValue("stringField"));
+ assertEquals(Integer.MAX_VALUE, record.getValue("extensionField"));
+
+ final MapRecord anyValueRecord = (MapRecord)
record.getValue("anyField");
+ assertEquals("Test field 1",
anyValueRecord.getValue("anyStringField1"));
+ assertEquals("Test field 2",
anyValueRecord.getValue("anyStringField2"));
+ }
+
+ @Test
+ public void testMissingMessage() {
+ final Schema schema = loadProto3TestSchema();
+ final RecordSchema recordSchema = new
ProtoSchemaParser(schema).createSchema("Proto3Message");
+
+ final ProtobufDataConverter dataConverter = new
ProtobufDataConverter(schema, "MissingMessage", recordSchema, false, false);
+
+ NullPointerException e = assertThrows(NullPointerException.class, ()
-> dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2()));
+ assertTrue(e.getMessage().contains("Message with name [MissingMessage]
not found in the provided proto files"), e.getMessage());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java
new file mode 100644
index 0000000000..d313bb595c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.schema;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
+import static
org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestProtoSchemaParser {
+
+ @Test
+ public void testSchemaParserForProto3() {
+ final ProtoSchemaParser schemaParser = new
ProtoSchemaParser(loadProto3TestSchema());
+
+ final SimpleRecordSchema expected = new
SimpleRecordSchema(Arrays.asList(
+ new RecordField("booleanField",
RecordFieldType.BOOLEAN.getDataType()),
+ new RecordField("stringField",
RecordFieldType.STRING.getDataType()),
+ new RecordField("int32Field",
RecordFieldType.INT.getDataType()),
+ new RecordField("uint32Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("sint32Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("fixed32Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("sfixed32Field",
RecordFieldType.INT.getDataType()),
+ new RecordField("doubleField",
RecordFieldType.DOUBLE.getDataType()),
+ new RecordField("floatField",
RecordFieldType.FLOAT.getDataType()),
+ new RecordField("bytesField",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
+ new RecordField("int64Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("uint64Field",
RecordFieldType.BIGINT.getDataType()),
+ new RecordField("sint64Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("fixed64Field",
RecordFieldType.BIGINT.getDataType()),
+ new RecordField("sfixed64Field",
RecordFieldType.LONG.getDataType()),
+ new RecordField("nestedMessage",
RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
+ new RecordField("testEnum",
RecordFieldType.ENUM.getEnumDataType(Arrays.asList("ENUM_VALUE_1",
"ENUM_VALUE_2", "ENUM_VALUE_3"))),
+ new RecordField("repeatedField",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())),
+ new RecordField("testMap",
RecordFieldType.MAP.getMapDataType(RecordFieldType.INT.getDataType())),
+ new RecordField("stringOption",
RecordFieldType.STRING.getDataType()),
+ new RecordField("booleanOption",
RecordFieldType.BOOLEAN.getDataType()),
+ new RecordField("int32Option",
RecordFieldType.INT.getDataType())
+ ))))
+ ));
+
+ final RecordSchema actual = schemaParser.createSchema("Proto3Message");
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testSchemaParserForProto2() {
+ final ProtoSchemaParser schemaParser = new
ProtoSchemaParser(loadProto2TestSchema());
+
+ final SimpleRecordSchema expected = new
SimpleRecordSchema(Arrays.asList(
+ new RecordField("booleanField",
RecordFieldType.BOOLEAN.getDataType(), false),
+ new RecordField("stringField",
RecordFieldType.STRING.getDataType(), "Missing field", true),
+ new RecordField("anyField",
RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
+ new RecordField("type_url",
RecordFieldType.STRING.getDataType()),
+ new RecordField("value",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))
+ )))),
+ new RecordField("extensionField",
RecordFieldType.INT.getDataType())
+ ));
+
+ final RecordSchema actual = schemaParser.createSchema("Proto2Message");
+ assertEquals(expected, actual);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc
new file mode 100644
index 0000000000..75c391b226
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc
@@ -0,0 +1,7 @@
+
+�
+ any.protogoogle.protobuf"6
+Any
+type_url ( RtypeUrl
+value (RvalueBv
+com.google.protobufBAnyProtoPZ,google.golang.org/protobuf/types/known/anypb�GPB�Google.Protobuf.WellKnownTypesbproto3
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc
new file mode 100644
index 0000000000..e938726323
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc
@@ -0,0 +1,11 @@
+
+�
+test_proto2.protogoogle/protobuf/any.proto"�
+ Proto2Message"
+booleanField (RbooleanField/
+stringField ( : Missing fieldRstringField0
+anyField (2.google.protobuf.AnyRanyField*d����"e
+AnyValueMessage(
+anyStringField1 ( RanyStringField1(
+anyStringField2 ( RanyStringField2:6
+extensionField.Proto2Messaged (RextensionField
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto
new file mode 100644
index 0000000000..11d71e6c44
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto2";
+
+import "google/protobuf/any.proto";
+
+message Proto2Message {
+ extensions 100 to max;
+ required bool booleanField = 1;
+ optional string stringField = 2 [default = "Missing field"];
+ optional google.protobuf.Any anyField = 3;
+}
+
+message AnyValueMessage {
+ optional string anyStringField1 = 1;
+ optional string anyStringField2 = 2;
+}
+
+extend Proto2Message {
+ optional int32 extensionField = 100;
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc
new file mode 100644
index 0000000000..a2316f3f87
Binary files /dev/null and
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc
differ
diff --git
a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto
new file mode 100644
index 0000000000..a6ddec0e61
--- /dev/null
+++
b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+message Proto3Message {
+ bool booleanField = 1;
+ string stringField = 2;
+ int32 int32Field = 3;
+ uint32 uint32Field = 4;
+ sint32 sint32Field = 5;
+ fixed32 fixed32Field = 6;
+ sfixed32 sfixed32Field = 7;
+ double doubleField = 8;
+ float floatField = 9;
+ bytes bytesField = 10;
+ int64 int64Field = 11;
+ uint64 uint64Field = 12;
+ sint64 sint64Field = 13;
+ fixed64 fixed64Field = 14;
+ sfixed64 sfixed64Field = 15;
+ NestedMessage nestedMessage = 16;
+}
+
+message NestedMessage {
+ TestEnum testEnum = 20;
+ repeated string repeatedField = 21;
+ oneof oneOfField {
+ string stringOption = 22;
+ bool booleanOption = 23;
+ int32 int32Option = 24;
+ }
+ map<string, int32> testMap = 25;
+}
+
+enum TestEnum {
+ ENUM_VALUE_1 = 0;
+ ENUM_VALUE_2 = 1;
+ ENUM_VALUE_3 = 2;
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml
b/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml
new file mode 100644
index 0000000000..04d9553db1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <version>1.26.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-protobuf-bundle</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>nifi-protobuf-services</module>
+ <module>nifi-protobuf-services-nar</module>
+ </modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>32.1.2-jre</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib</artifactId>
+ <version>${kotlin.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ <version>${kotlin.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-common</artifactId>
+ <version>${kotlin.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${org.apache.commons.compress.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${org.apache.commons.io.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <version>${org.apache.commons.text.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${org.apache.commons.lang3.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.bom.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 67fd6fd8c7..20e49475f9 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -127,6 +127,7 @@
<module>nifi-cipher-bundle</module>
<module>nifi-compress-bundle</module>
<module>nifi-opentelemetry-bundle</module>
+ <module>nifi-protobuf-bundle</module>
</modules>
<build>