This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c0c1b386f6 NIFI-11981 - PublishGCPubSub failure / Record-based
processing / AVRO
c0c1b386f6 is described below
commit c0c1b386f61737635eab3f42e3d0b97296fa09f3
Author: Paul Grey <[email protected]>
AuthorDate: Tue Aug 22 16:42:03 2023 -0400
NIFI-11981 - PublishGCPubSub failure / Record-based processing / AVRO
Signed-off-by: Matt Burgess <[email protected]>
This closes #7638
---
.../nifi-gcp-bundle/nifi-gcp-processors/pom.xml | 1 +
.../processors/gcp/pubsub/PublishGCPubSub.java | 27 ++++++++--------
.../processors/gcp/pubsub/PublishGCPubSubTest.java | 36 +++++++++++++++------
.../src/test/resources/pubsub/records.avro | Bin 0 -> 374 bytes
4 files changed, 42 insertions(+), 22 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 84e84417ea..13fab32080 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -209,6 +209,7 @@
<exclude>src/test/resources/bigquery/schema-correct-data-with-date.avsc</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date.json</exclude>
<exclude>src/test/resources/bigquery/streaming-correct-data-with-date-formatted.json</exclude>
+
<exclude>src/test/resources/pubsub/records.avro</exclude>
</excludes>
</configuration>
</plugin>
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index f2d5ed7ebd..d48652f0b6 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -369,20 +369,21 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
final List<Throwable> failures = new ArrayList<>();
final Map<String, String> attributes = flowFile.getAttributes();
- final RecordReader reader = readerFactory.createRecordReader(
- attributes, session.read(flowFile), flowFile.getSize(),
getLogger());
- final RecordSet recordSet = reader.createRecordSet();
- final RecordSchema schema = writerFactory.getSchema(attributes,
recordSet.getSchema());
-
- final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, baos, attributes);
- final PushBackRecordSet pushBackRecordSet = new
PushBackRecordSet(recordSet);
-
- while (pushBackRecordSet.isAnotherRecord()) {
- final ApiFuture<String> apiFuture = publishOneRecord(context,
flowFile, baos, writer, pushBackRecordSet.next());
- futures.add(apiFuture);
- addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
+ try (final RecordReader reader = readerFactory.createRecordReader(
+ attributes, session.read(flowFile), flowFile.getSize(),
getLogger())) {
+ final RecordSet recordSet = reader.createRecordSet();
+ final RecordSchema schema =
writerFactory.getSchema(attributes, recordSet.getSchema());
+
+ final RecordSetWriter writer =
writerFactory.createWriter(getLogger(), schema, baos, attributes);
+ final PushBackRecordSet pushBackRecordSet = new
PushBackRecordSet(recordSet);
+
+ while (pushBackRecordSet.isAnotherRecord()) {
+ final ApiFuture<String> apiFuture =
publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
+ futures.add(apiFuture);
+ addCallback(apiFuture, new
TrackedApiFutureCallback(successes, failures), executor);
+ }
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
}
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
}
finishBatch(session, stopWatch, flowFileResults);
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
index 2870942834..66bd66af72 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSubTest.java
@@ -24,6 +24,7 @@ import com.google.cloud.pubsub.v1.Publisher;
import io.grpc.Status;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
@@ -32,6 +33,7 @@ import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControll
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -154,12 +156,29 @@ public class PublishGCPubSubTest {
runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_FAILURE, 1);
}
+ @Test
+ void testSendOneSuccessRecordStrategyAvroReader() throws
InitializationException, IOException {
+ runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
+ runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
+ runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner, new AvroReader()));
+ runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
+ runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
+
+ runner.enqueue(IOUtils.toByteArray(Objects.requireNonNull(
+
getClass().getClassLoader().getResource("pubsub/records.avro"))));
+ runner.run(1, true, true);
+ runner.assertAllFlowFilesTransferred(PublishGCPubSub.REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(PublishGCPubSub.REL_SUCCESS).iterator().next();
+ assertEquals("3",
flowFile.getAttribute(PubSubAttributes.RECORDS_ATTRIBUTE));
+ }
+
@Test
void testSendOneSuccessRecordStrategy() throws InitializationException,
IOException {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
- runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@@ -178,7 +197,7 @@ public class PublishGCPubSubTest {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
- runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@@ -195,7 +214,7 @@ public class PublishGCPubSubTest {
runner.setProperty(PublishGCPubSub.GCP_CREDENTIALS_PROVIDER_SERVICE,
getCredentialsServiceId(runner));
runner.setProperty(PublishGCPubSub.TOPIC_NAME, TOPIC);
runner.setProperty(PublishGCPubSub.PROJECT_ID, PROJECT);
- runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner));
+ runner.setProperty(PublishGCPubSub.RECORD_READER,
getReaderServiceId(runner, new JsonTreeReader()));
runner.setProperty(PublishGCPubSub.RECORD_WRITER,
getWriterServiceId(runner));
runner.setProperty(PublishGCPubSub.MESSAGE_DERIVATION_STRATEGY,
MessageDerivationStrategy.RECORD_ORIENTED.getValue());
@@ -214,15 +233,14 @@ public class PublishGCPubSubTest {
return controllerServiceId;
}
- private static String getReaderServiceId(TestRunner runner) throws
InitializationException {
- final ControllerService readerService = new JsonTreeReader();
- final String readerServiceId = readerService.getClass().getName();
- runner.addControllerService(readerServiceId, readerService);
- runner.enableControllerService(readerService);
+ private static String getReaderServiceId(
+ final TestRunner runner, final RecordReaderFactory
recordReaderFactory) throws InitializationException {
+ final String readerServiceId =
recordReaderFactory.getClass().getName();
+ runner.addControllerService(readerServiceId, recordReaderFactory);
+ runner.enableControllerService(recordReaderFactory);
return readerServiceId;
}
-
private static String getWriterServiceId(TestRunner runner) throws
InitializationException {
final ControllerService writerService = new JsonRecordSetWriter();
final String writerServiceId = writerService.getClass().getName();
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
new file mode 100644
index 0000000000..3456b31d4b
Binary files /dev/null and
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/resources/pubsub/records.avro
differ