[
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16693595#comment-16693595
]
ASF GitHub Bot commented on NIFI-4914:
--------------------------------------
Github user joewitt commented on a diff in the pull request:
https://github.com/apache/nifi/pull/3178#discussion_r235108402
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java
---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar. "
+ + "The complementary NiFi processor for sending messages is
PublishPulsarRecord. Please note that, at this time, "
+ + "the Processor assumes that all records that are retrieved have
the same schema. If any of the Pulsar messages "
+ + "that are pulled but cannot be parsed or written with the
configured Record Reader or Record Writer, the contents "
+ + "of the message will be written to a separate FlowFile, and that
FlowFile will be transferred to the 'parse.failure' "
+ + "relationship. Otherwise, each FlowFile is sent to the 'success'
relationship and may contain many individual "
+ + "messages within the single FlowFile. A 'record.count' attribute
is added to indicate how many messages are contained in the "
+ + "FlowFile. No two Pulsar messages will be placed into the same
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest",
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.count", description = "The number
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@SeeAlso({PublishPulsar.class, ConsumePulsar.class,
PublishPulsarRecord.class})
+public class ConsumePulsarRecord extends
AbstractPulsarConsumerProcessor<byte[]> {
+
+ public static final String MSG_COUNT = "record.count";
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use for incoming FlowFiles")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .displayName("Record Writer")
+ .description("The Record Writer to use in order to serialize
the data before sending to Pulsar")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Maximum Async Requests")
+ .description("The number of records to combine into a single
flow file.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("1000")
+ .build();
+
+ public static final PropertyDescriptor MAX_WAIT_TIME = new
PropertyDescriptor.Builder()
+ .name("Max Wait Time")
+ .description("The maximum amount of time allowed for a Pulsar
consumer to poll a subscription for data "
+ + ", zero means there is no limit. Max time less than
1 second will be equal to zero.")
+ .defaultValue("2 seconds")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final Relationship REL_PARSE_FAILURE = new
Relationship.Builder()
+ .name("parse_failure")
+ .description("FlowFiles for which the content cannot be
parsed.")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES;
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ static {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ properties.add(BATCH_SIZE);
+ properties.add(MAX_WAIT_TIME);
+ properties.addAll(AbstractPulsarConsumerProcessor.PROPERTIES);
+ PROPERTIES = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_PARSE_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER)
+ .asControllerService(RecordReaderFactory.class);
+
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER)
+ .asControllerService(RecordSetWriterFactory.class);
+
+ try {
+ Consumer<byte[]> consumer = getConsumer(context,
getConsumerId(context, session.get()));
+
+ if (consumer == null) { /* If we aren't connected to Pulsar,
then just yield */
+ context.yield();
+ return;
+ }
+
+ if (context.getProperty(ASYNC_ENABLED).isSet() &&
context.getProperty(ASYNC_ENABLED).asBoolean()) {
+ consumeAsync(consumer, context, session);
+ handleAsync(context, session, consumer, readerFactory,
writerFactory);
+ } else {
+ consumeMessage(session, consumer, consumer.receive(),
readerFactory, writerFactory);
+ }
+ } catch (PulsarClientException e) {
+ getLogger().error("Unable to consume from Pulsar Topic ", e);
+ context.yield();
+ throw new ProcessException(e);
+ }
+ }
+
+ private void consumeMessage(ProcessSession session, final
Consumer<byte[]> consumer, final Message<byte[]> msg,
+ final RecordReaderFactory readerFactory,
RecordSetWriterFactory writerFactory) throws PulsarClientException {
+ consumeMessage(session, consumer, msg, false, readerFactory,
writerFactory);
+ }
+
+ /**
+ * Consumes a single message, parses it, and returns.
+ */
+ private void consumeMessage(ProcessSession session, final
Consumer<byte[]> consumer, final Message<byte[]> msg, boolean async,
+ final RecordReaderFactory readerFactory, RecordSetWriterFactory
writerFactory) throws PulsarClientException {
+
+ RecordSetWriter writer = null;
+ FlowFile flowFile = session.create();
+ OutputStream rawOut = null;
+ RecordReader reader = getRecordReader(msg, readerFactory, session);
+ Record firstRecord = getFirstRecord(msg, reader, session);
+
+ // We have a message that we cannot parse.
+ if (firstRecord == null) {
+ if (!async) {
+ consumer.acknowledge(msg);
+ } else {
+ getAckService().submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return consumer.acknowledgeAsync(msg).get();
--- End diff --
are we only pulling a single message from pulsar each time? We should pull
a ton of them...and write out the records.. It looks liket his is a single
pulsar message but then we try to extract multiple records from it. That is a
fine path but the performant path will be to have many messages turned into a
large set of records that go in a single flow file and repeat.
> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord,
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
> Key: NIFI-4914
> URL: https://issues.apache.org/jira/browse/NIFI-4914
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.6.0
> Reporter: David Kjerrumgaard
> Priority: Minor
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)