This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c0c1b386f6 NIFI-11981 - PublishGCPubSub failure / Record-based 
processing / AVRO
c0c1b386f6 is described below

commit c0c1b386f61737635eab3f42e3d0b97296fa09f3
Author: Paul Grey <[email protected]>
AuthorDate: Tue Aug 22 16:42:03 2023 -0400

    NIFI-11981 - PublishGCPubSub failure / Record-based processing / AVRO
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #7638
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |   1 +
 .../processors/gcp/pubsub/PublishGCPubSub.java     |  27 ++++++++--------
 .../processors/gcp/pubsub/PublishGCPubSubTest.java |  36 +++++++++++++++------
 .../src/test/resources/pubsub/records.avro         | Bin 0 -> 374 bytes
 4 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 84e84417ea..13fab32080 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -209,6 +209,7 @@
                         
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
                         
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
                         
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
+                        
<exclude>src/test/resources/pubsub/records.avro</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index f2d5ed7ebd..d48652f0b6 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -369,20 +369,21 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
             final List<Throwable> failures = new ArrayList<>();
 
             final Map<String, String> attributes = flowFile.getAttributes();
-            final RecordReader reader = readerFactory.createRecordReader(
-                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger());
-            final RecordSet recordSet = reader.createRecordSet();
-            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
-
-            final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
-            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
-
-            while (pushBackRecordSet.isAnotherRecord()) {
-                final ApiFuture<String> apiFuture = publishOneRecord(context, 
flowFile, baos, writer, pushBackRecordSet.next());
-                futures.add(apiFuture);
-                addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
+            try (final RecordReader reader = readerFactory.createRecordReader(
+                    attributes, session.read(flowFile), flowFile.getSize(), 
getLogger())) {
+                final RecordSet recordSet = reader.createRecordSet();
+                final RecordSchema schema = 
writerFactory.getSchema(attributes, recordSet.getSchema());
+
+                final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+                final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+
+                while (pushBackRecordSet.isAnotherRecord()) {
+                    final ApiFuture<String> apiFuture = 
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
+                    futures.add(apiFuture);
+                    addCallback(apiFuture, new 
TrackedApiFutureCallback(successes, failures), executor);
+                }
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
             }
-            flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
         }
         finishBatch(session, stopWatch, flowFileResults);
     }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
index 2870942834..66bd66af72 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.pubsub.v1.Publisher;
 import io.grpc.Status;
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.avro.AvroReader;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.json.JsonRecordSetWriter;
 import org.apache.nifi.json.JsonTreeReader;
@@ -32,6 +33,7 @@ import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControll
 import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
 import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -154,12 +156,29 @@ public class PublishGCPubSubTest {
         runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
     }
 
+    @Test
+    void testSendOneSuccessRecordStrategyAvroReader() throws 
InitializationException, IOException {
+        runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
+        runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+        runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner, new AvroReader()));
+        runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
+        runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+        runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+                
getClass().getClassLoader().getResource("pubsub/records.avro"))));
+        runner.run(1, true, true);
+        runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+        final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+        assertEquals("3", 
flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
+    }
+
     @Test
     void testSendOneSuccessRecordStrategy() throws InitializationException, 
IOException {
         runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
         runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
         runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
-        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner, new JsonTreeReader()));
         runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
         runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
 
@@ -178,7 +197,7 @@ public class PublishGCPubSubTest {
         runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
         runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
         runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
-        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner, new JsonTreeReader()));
         runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
         runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
 
@@ -195,7 +214,7 @@ public class PublishGCPubSubTest {
         runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE, 
getCredentialsServiceId(runner));
         runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
         runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
-        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner));
+        runner.setProperty(PublishGCPubSub.RECORD_READER, 
getReaderServiceId(runner, new JsonTreeReader()));
         runner.setProperty(PublishGCPubSub.RECORD_WRITER, 
getWriterServiceId(runner));
         runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY, 
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
 
@@ -214,15 +233,14 @@ public class PublishGCPubSubTest {
         return controllerServiceId;
     }
 
-    private static String getReaderServiceId(TestRunner runner) throws 
InitializationException {
-        final ControllerService readerService = new JsonTreeReader();
-        final String readerServiceId = readerService.getClass().getName();
-        runner.addControllerService(readerServiceId, readerService);
-        runner.enableControllerService(readerService);
+    private static String getReaderServiceId(
+            final TestRunner runner, final RecordReaderFactory 
recordReaderFactory) throws InitializationException {
+        final String readerServiceId = 
recordReaderFactory.getClass().getName();
+        runner.addControllerService(readerServiceId, recordReaderFactory);
+        runner.enableControllerService(recordReaderFactory);
         return readerServiceId;
     }
 
-
     private static String getWriterServiceId(TestRunner runner) throws 
InitializationException {
         final ControllerService writerService = new JsonRecordSetWriter();
         final String writerServiceId = writerService.getClass().getName();
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
new file mode 100644
index 0000000000..3456b31d4b
Binary files /dev/null and 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
 differ

Reply via email to