exceptionfactory commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1643159297


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper;
+import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class Kafka3ProducerService implements KafkaProducerService {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private final Producer<byte[], byte[]> producer;
+    private final List<ProducerCallback> callbacks;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final KafkaProducerWrapper wrapper;
+
+    public Kafka3ProducerService(final Properties properties,
+                                 final ServiceConfiguration 
serviceConfiguration,
+                                 final ProducerConfiguration 
producerConfiguration) {
+        final ByteArraySerializer serializer = new ByteArraySerializer();
+        this.producer = new KafkaProducer<>(properties, serializer, 
serializer);
+        this.callbacks = new ArrayList<>();
+
+        this.serviceConfiguration = serviceConfiguration;
+
+        this.wrapper = producerConfiguration.getTransactionsEnabled()
+                ? new KafkaTransactionalProducerWrapper(producer)
+                : new KafkaNonTransactionalProducerWrapper(producer);
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    @Override
+    public void init() {
+        wrapper.init();
+    }
+
+    @Override
+    public void send(final Iterator<KafkaRecord> kafkaRecords, final 
PublishContext publishContext) {
+        final ProducerCallback callback = new 
ProducerCallback(publishContext.getFlowFile());
+        callbacks.add(callback);
+        Optional.ofNullable(publishContext.getException()).ifPresent(e -> 
callback.getExceptions().add(e));
+        if (callback.getExceptions().isEmpty()) {
+            try {
+                wrapper.send(kafkaRecords, publishContext, callback);
+                logger.trace("send():inFlight");
+            } catch (final UncheckedIOException e) {
+                callback.getExceptions().add(e);
+                logger.trace("send():{}}", e.getMessage());

Review Comment:
   Recommend removing.
   ```suggestion
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper;
+import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class Kafka3ProducerService implements KafkaProducerService {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private final Producer<byte[], byte[]> producer;
+    private final List<ProducerCallback> callbacks;
+
+    private final ServiceConfiguration serviceConfiguration;
+
+    private final KafkaProducerWrapper wrapper;
+
+    public Kafka3ProducerService(final Properties properties,
+                                 final ServiceConfiguration 
serviceConfiguration,
+                                 final ProducerConfiguration 
producerConfiguration) {
+        final ByteArraySerializer serializer = new ByteArraySerializer();
+        this.producer = new KafkaProducer<>(properties, serializer, 
serializer);
+        this.callbacks = new ArrayList<>();
+
+        this.serviceConfiguration = serviceConfiguration;
+
+        this.wrapper = producerConfiguration.getTransactionsEnabled()
+                ? new KafkaTransactionalProducerWrapper(producer)
+                : new KafkaNonTransactionalProducerWrapper(producer);
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    @Override
+    public void init() {
+        wrapper.init();
+    }
+
+    @Override
+    public void send(final Iterator<KafkaRecord> kafkaRecords, final 
PublishContext publishContext) {
+        final ProducerCallback callback = new 
ProducerCallback(publishContext.getFlowFile());
+        callbacks.add(callback);
+        Optional.ofNullable(publishContext.getException()).ifPresent(e -> 
callback.getExceptions().add(e));
+        if (callback.getExceptions().isEmpty()) {
+            try {
+                wrapper.send(kafkaRecords, publishContext, callback);
+                logger.trace("send():inFlight");

Review Comment:
   Since this has no other context, recommend removing.
   ```suggestion
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/consume/ConsumeKafkaConfigureTest.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.consume;
+
+import org.apache.nifi.kafka.processors.ConsumeKafka;
+import org.apache.nifi.kafka.service.Kafka3ConnectionService;
+import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+class ConsumeKafkaConfigureTest {
+
+    private static final String CONNECTION_SERVICE_ID = 
Kafka3ConnectionService.class.getSimpleName();
+    private static final String CONFIG_BOOTSTRAP_SERVERS = 
"PLAINTEXT://localhost:50000";
+    private static final String CONFIG_MAX_POLL_RECORDS = "1234";
+
+    @Test
+    void testConfigureMaxPollRecords() throws InitializationException, 
NoSuchFieldException, IllegalAccessException {
+        final Kafka3ConnectionService connectionService = new 
Kafka3ConnectionService();
+        final Class<?> serviceClass = connectionService.getClass();
+
+        final TestRunner runner = 
TestRunners.newTestRunner(ConsumeKafka.class);
+        runner.addControllerService(CONNECTION_SERVICE_ID, connectionService);
+        runner.setProperty(connectionService, 
Kafka3ConnectionService.BOOTSTRAP_SERVERS, CONFIG_BOOTSTRAP_SERVERS);
+        runner.setProperty(connectionService, 
Kafka3ConnectionService.MAX_POLL_RECORDS, CONFIG_MAX_POLL_RECORDS);
+        runner.enableControllerService(connectionService);
+
+        final Field field = serviceClass.getDeclaredField("clientProperties"); 
 // properties used to init ConsumerService
+        field.setAccessible(true);

Review Comment:
   Using reflection with this approach is generally dangerous for tests because 
it is subject to breaking with unclear reasons for failure. Since this test 
class is focused on configuration, I recommend removing it entirely. The 
variety of other tests seem to provide sufficient coverage.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordStreamKafkaRecordConverter.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.convert;
+
+import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.value.RecordValueFactory;
+import org.apache.nifi.kafka.processors.producer.value.ValueFactory;
+import org.apache.nifi.kafka.service.api.header.RecordHeader;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaRecordConverter} implementation for transforming NiFi
+ * {@link org.apache.nifi.serialization.record.Record} objects to {@link 
KafkaRecord} for publish to
+ * Kafka.
+ */
+public class RecordStreamKafkaRecordConverter implements KafkaRecordConverter {
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final HeadersFactory headersFactory;
+    private final KeyFactory keyFactory;
+    private final int maxMessageSize;
+    private final ComponentLog logger;
+
+    public RecordStreamKafkaRecordConverter(
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final HeadersFactory headersFactory,
+            final KeyFactory keyFactory,
+            final int maxMessageSize,
+            final ComponentLog logger) {
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.headersFactory = headersFactory;
+        this.keyFactory = keyFactory;
+        this.maxMessageSize = maxMessageSize;
+        this.logger = logger;
+    }
+
+    @Override
+    public Iterator<KafkaRecord> convert(
+            final Map<String, String> attributes, final InputStream in, final 
long inputLength)
+            throws IOException {
+        try {
+            final RecordReader reader = 
readerFactory.createRecordReader(attributes, in, inputLength, logger);
+            final RecordSet recordSet = reader.createRecordSet();
+            final RecordSchema schema = writerFactory.getSchema(attributes, 
recordSet.getSchema());
+
+            final ByteArrayOutputStream os = new ByteArrayOutputStream();
+            final RecordSetWriter writer = writerFactory.createWriter(logger, 
schema, os, attributes);
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            return toKafkaRecordIterator(attributes, os, writer, 
pushBackRecordSet);
+        } catch (MalformedRecordException | SchemaNotFoundException e) {
+            throw new IOException("Stream to Record conversion failed", e);
+        }
+    }
+
+    private Iterator<KafkaRecord> toKafkaRecordIterator(
+            final Map<String, String> attributes,
+            final ByteArrayOutputStream os,
+            final RecordSetWriter writer,
+            final PushBackRecordSet pushBackRecordSet) throws IOException {
+        final ValueFactory valueFactory = new RecordValueFactory(os, writer);
+        final List<RecordHeader> headers = 
headersFactory.getHeaders(attributes);
+
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                try {
+                    return pushBackRecordSet.isAnotherRecord();
+                } catch (final IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
+
+            @Override
+            public KafkaRecord next() {
+                try {
+                    final Record record = pushBackRecordSet.next();
+                    final byte[] key = keyFactory.getKey(attributes, record);
+                    final byte[] value = valueFactory.getValue(record);
+                    ProducerUtils.checkMessageSize(maxMessageSize, 
value.length);
+                    return new KafkaRecord(null, null, null, key, value, 
headers);
+                } catch (final IOException e) {
+                    throw new UncheckedIOException(e);

Review Comment:
   A message should be included:
   ```suggestion
                       throw new UncheckedIOException("Record conversion 
failed", e);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.convert;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
+import org.apache.nifi.kafka.processors.producer.wrapper.RecordFieldConverter;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord;
+import org.apache.nifi.kafka.service.api.header.RecordHeader;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaRecordConverter} implementation for transforming NiFi
+ * {@link org.apache.nifi.serialization.record.Record} objects to {@link 
KafkaRecord} for publish to
+ * Kafka.
+ */
+public class RecordWrapperStreamKafkaRecordConverter implements 
KafkaRecordConverter {
+    private final FlowFile flowFile;
+    private final RecordMetadataStrategy metadataStrategy;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordSetWriterFactory keyWriterFactory;
+    private final int maxMessageSize;
+    private final ComponentLog logger;
+
+    public RecordWrapperStreamKafkaRecordConverter(
+            final FlowFile flowFile,
+            final RecordMetadataStrategy metadataStrategy,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final RecordSetWriterFactory keyWriterFactory,
+            final int maxMessageSize,
+            final ComponentLog logger) {
+        this.flowFile = flowFile;
+        this.metadataStrategy = metadataStrategy;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.keyWriterFactory = keyWriterFactory;
+        this.maxMessageSize = maxMessageSize;
+        this.logger = logger;
+    }
+
+    @Override
+    public Iterator<KafkaRecord> convert(
+            final Map<String, String> attributes, final InputStream in, final 
long inputLength)
+            throws IOException {
+        try {
+            final RecordReader reader = 
readerFactory.createRecordReader(attributes, in, inputLength, logger);
+            final RecordSet recordSet = reader.createRecordSet();
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            return toKafkaRecordIterator(pushBackRecordSet);
+        } catch (MalformedRecordException | SchemaNotFoundException e) {
+            throw new IOException("Stream to Record conversion failed", e);
+        }
+    }
+
+    private Iterator<KafkaRecord> toKafkaRecordIterator(
+            final PushBackRecordSet pushBackRecordSet) {
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                try {
+                    return pushBackRecordSet.isAnotherRecord();
+                } catch (final IOException e) {
+                    throw new UncheckedIOException(e);

Review Comment:
   A message should be included:
   ```suggestion
                       throw new UncheckedIOException("Record evaluation 
failed", e);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.convert;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.ProducerUtils;
+import org.apache.nifi.kafka.processors.producer.wrapper.RecordFieldConverter;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord;
+import org.apache.nifi.kafka.service.api.header.RecordHeader;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.PushBackRecordSet;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaRecordConverter} implementation for transforming NiFi
+ * {@link org.apache.nifi.serialization.record.Record} objects to {@link 
KafkaRecord} for publish to
+ * Kafka.
+ */
+public class RecordWrapperStreamKafkaRecordConverter implements 
KafkaRecordConverter {
+    private final FlowFile flowFile;
+    private final RecordMetadataStrategy metadataStrategy;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordSetWriterFactory keyWriterFactory;
+    private final int maxMessageSize;
+    private final ComponentLog logger;
+
+    public RecordWrapperStreamKafkaRecordConverter(
+            final FlowFile flowFile,
+            final RecordMetadataStrategy metadataStrategy,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final RecordSetWriterFactory keyWriterFactory,
+            final int maxMessageSize,
+            final ComponentLog logger) {
+        this.flowFile = flowFile;
+        this.metadataStrategy = metadataStrategy;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.keyWriterFactory = keyWriterFactory;
+        this.maxMessageSize = maxMessageSize;
+        this.logger = logger;
+    }
+
+    @Override
+    public Iterator<KafkaRecord> convert(
+            final Map<String, String> attributes, final InputStream in, final 
long inputLength)
+            throws IOException {
+        try {
+            final RecordReader reader = 
readerFactory.createRecordReader(attributes, in, inputLength, logger);
+            final RecordSet recordSet = reader.createRecordSet();
+            final PushBackRecordSet pushBackRecordSet = new 
PushBackRecordSet(recordSet);
+            return toKafkaRecordIterator(pushBackRecordSet);
+        } catch (MalformedRecordException | SchemaNotFoundException e) {
+            throw new IOException("Stream to Record conversion failed", e);
+        }
+    }
+
+    private Iterator<KafkaRecord> toKafkaRecordIterator(
+            final PushBackRecordSet pushBackRecordSet) {
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                try {
+                    return pushBackRecordSet.isAnotherRecord();
+                } catch (final IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
+
+            @Override
+            public KafkaRecord next() {
+                try {
+                    final Record record = pushBackRecordSet.next();
+                    final RecordFieldConverter converter = new 
RecordFieldConverter(record, flowFile, logger);
+                    final byte[] key = converter.toBytes(WrapperRecord.KEY, 
keyWriterFactory);
+                    final byte[] value = 
converter.toBytes(WrapperRecord.VALUE, writerFactory);
+                    ProducerUtils.checkMessageSize(maxMessageSize, 
value.length);
+
+                    final List<RecordHeader> headers = getKafkaHeaders(record);
+
+                    // wrapper record may specify custom topic / partition
+                    String topic = null;
+                    Integer partition = null;
+                    if (metadataStrategy == 
RecordMetadataStrategy.FROM_RECORD) {
+                        final MapRecord myMetadataRecord = (MapRecord) 
record.getValue(WrapperRecord.METADATA);
+                        topic = 
myMetadataRecord.getAsString(WrapperRecord.TOPIC);
+                        partition = 
myMetadataRecord.getAsInt(WrapperRecord.PARTITION);
+                    }
+
+                    return new KafkaRecord(topic, partition, null, key, value, 
headers);
+                } catch (final IOException e) {
+                    throw new UncheckedIOException(e);

Review Comment:
   ```suggestion
                       throw new UncheckedIOException("Record conversion 
failed", e);
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.wrapper;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class RecordFieldConverter {
+    private final Record record;
+    private final FlowFile flowFile;
+    private final ComponentLog logger;
+
+    public RecordFieldConverter(final Record record, final FlowFile flowFile, 
final ComponentLog logger) {
+        this.record = record;
+        this.flowFile = flowFile;
+        this.logger = logger;
+    }
+
+    public byte[] toBytes(final String fieldName, final RecordSetWriterFactory 
writerFactory) throws IOException {
+        final byte[] bytes;
+
+        try {
+            final Object field = record.getValue(fieldName);
+            if (field == null) {
+                bytes = null;
+            } else if (field instanceof Record) {
+                bytes = toBytes((Record) field, writerFactory);
+            } else if (field instanceof Byte[]) {
+                bytes = toBytes((Byte[]) field);
+            } else if (field instanceof Object[]) {
+                bytes = toBytes((Object[]) field);
+            } else if (field instanceof String) {
+                bytes = toBytes((String) field);
+            } else {
+                throw new MalformedRecordException(String.format("Couldn't 
convert %s record data to byte array.", fieldName));

Review Comment:
   Recommend avoiding contractions and periods in messages:
   ```suggestion
                   throw new MalformedRecordException(String.format("Failed to 
convert [%s] record data to byte array", fieldName));
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as either a message 
or as individual records to Apache Kafka using the Kafka Producer API. "
+        + "The messages to send may be individual FlowFiles, may be delimited 
using a "
+        + "user-specified delimiter (such as a new-line), or "
+        + "may be record-oriented data that can be read by the configured 
Record Reader. "
+        + "The complementary NiFi processor for fetching messages is 
ConsumeKafka.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "If this attribute is set to 'true', if the processor is not 
configured "
+        + "with a demarcator and if the FlowFile's content is null, then a 
tombstone message with zero bytes will be sent to Kafka.")
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
+        + "FlowFiles that are routed to success.")
+@SeeAlso({ConsumeKafka.class})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    public static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Kafka Connection Service")
+            .description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+            .identifiesControllerService(KafkaConnectionService.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("Name of the Kafka Topic to which the Processor 
publishes Kafka Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to Kafka")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Publish Strategy")
+            .description("The format used to publish the incoming FlowFile 
record to Kafka.")
+            .required(true)
+            .defaultValue(PublishStrategy.USE_VALUE.getValue())
+            .allowableValues(PublishStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Message Key Field")
+            .description("The name of a field in the Input Records that should 
be used as the Key for the Kafka message.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                    + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                    + "To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter, depending on your OS.")
+            .build();
+
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("Kafka Key")
+            .description("The Key to use for the Message. "
+                    + "If not specified, the flow file attribute 'kafka.key' 
is used as the message key, if it is present."
+                    + "Beware that setting Kafka key and demarcating at the 
same time may potentially lead to many Kafka messages with the same key."
+                    + "Normally this is not a problem as Kafka does not 
enforce or assume message and key uniqueness. Still, setting the demarcator and 
Kafka key at the same time poses a risk of "
+                    + "data loss on Kafka. During a topic compaction on Kafka, 
messages will be deduplicated based on this key.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named 
'" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the 
value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("compression.type")
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+            .name("Attributes to Send as Headers (Regex)")
+            .description("A Regular Expression that is matched against all 
FlowFile attribute names. "
+                    + "Any attribute whose name matches the regex will be 
added to the Kafka messages as a Header. "
+                    + "If not specified, no FlowFile attributes will be added 
as headers.")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor TRANSACTIONS_ENABLED = new 
PropertyDescriptor.Builder()
+            .name("Transactions Enabled")
+            .description("Specifies whether or not NiFi should provide 
Transactional guarantees when communicating with Kafka. If there is a problem 
sending data to Kafka, "
+                    + "and this property is set to false, then the messages 
that have already been sent to Kafka will continue on and be delivered to 
consumers. "
+                    + "If this is set to true, then the Kafka transaction will 
be rolled back so that those messages are not available to consumers. Setting 
this to true "
+                    + "requires that the [Delivery Guarantee] property be set 
to [Guarantee Replicated Delivery.]")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("Transactional Id Prefix")
+            .description("When [Transactions Enabled] is set to true, 
KafkaProducer config 'transactional.id' will be a generated UUID and will be 
prefixed with this string.")
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .dependsOn(TRANSACTIONS_ENABLED, "true")
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Message Header Encoding")
+            .description("For any attribute that is added as a message header, 
as configured via the <Attributes to Send as Headers> property, "
+                    + "this property indicates the Character Encoding to use 
for serializing the headers.")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue("UTF-8")
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor RECORD_KEY_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Key Writer")
+            .description("The Record Key Writer to use for outgoing FlowFiles")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .dependsOn(PUBLISH_STRATEGY, 
PublishStrategy.USE_WRAPPER.getValue())
+            .build();
+
+    public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Record Metadata Strategy")
+            .description("Specifies whether the Record's metadata (topic and 
partition) should come from the Record's metadata field or if it should come 
from the configured " +
+                    "Topic Name and Partition / Partitioner class properties")
+            .required(true)
+            .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES.getValue())
+            .allowableValues(RecordMetadataStrategy.class)
+            .dependsOn(PUBLISH_STRATEGY, 
PublishStrategy.USE_WRAPPER.getValue())
+            .build();
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner",
+            "RoundRobinPartitioner",
+            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+            "DefaultPartitioner", "The default partitioning strategy will 
choose the sticky partition that changes when the batch is full "
+            + "(See KIP-480 for details about sticky partitioning).");
+    public static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.ExpressionLanguagePartitioner",

Review Comment:
   Is there a reason this is public versus of the others? Recommend moving all 
values to a separate `enum`.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as either a message 
or as individual records to Apache Kafka using the Kafka Producer API. "
+        + "The messages to send may be individual FlowFiles, may be delimited 
using a "
+        + "user-specified delimiter (such as a new-line), or "
+        + "may be record-oriented data that can be read by the configured 
Record Reader. "
+        + "The complementary NiFi processor for fetching messages is 
ConsumeKafka.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "If this attribute is set to 'true', if the processor is not 
configured "
+        + "with a demarcator and if the FlowFile's content is null, then a 
tombstone message with zero bytes will be sent to Kafka.")
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
+        + "FlowFiles that are routed to success.")
+@SeeAlso({ConsumeKafka.class})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    public static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Kafka Connection Service")
+            .description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+            .identifiesControllerService(KafkaConnectionService.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("Name of the Kafka Topic to which the Processor 
publishes Kafka Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to Kafka")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Publish Strategy")
+            .description("The format used to publish the incoming FlowFile 
record to Kafka.")
+            .required(true)
+            .defaultValue(PublishStrategy.USE_VALUE.getValue())
+            .allowableValues(PublishStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Message Key Field")
+            .description("The name of a field in the Input Records that should 
be used as the Key for the Kafka message.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                    + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                    + "To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter, depending on your OS.")
+            .build();
+
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("Kafka Key")
+            .description("The Key to use for the Message. "
+                    + "If not specified, the flow file attribute 'kafka.key' 
is used as the message key, if it is present."
+                    + "Beware that setting Kafka key and demarcating at the 
same time may potentially lead to many Kafka messages with the same key."
+                    + "Normally this is not a problem as Kafka does not 
enforce or assume message and key uniqueness. Still, setting the demarcator and 
Kafka key at the same time poses a risk of "
+                    + "data loss on Kafka. During a topic compaction on Kafka, 
messages will be deduplicated based on this key.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named 
'" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the 
value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("compression.type")
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+            .name("Attributes to Send as Headers (Regex)")

Review Comment:
   The parenthetical should be removed from the name. What do you think of an 
alternative name?
   ```suggestion
               .name("FlowFile Attribute Header Pattern")
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as either a message 
or as individual records to Apache Kafka using the Kafka Producer API. "
+        + "The messages to send may be individual FlowFiles, may be delimited 
using a "
+        + "user-specified delimiter (such as a new-line), or "
+        + "may be record-oriented data that can be read by the configured 
Record Reader. "
+        + "The complementary NiFi processor for fetching messages is 
ConsumeKafka.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "If this attribute is set to 'true', if the processor is not 
configured "
+        + "with a demarcator and if the FlowFile's content is null, then a 
tombstone message with zero bytes will be sent to Kafka.")
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
+        + "FlowFiles that are routed to success.")
+@SeeAlso({ConsumeKafka.class})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    public static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Kafka Connection Service")
+            .description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+            .identifiesControllerService(KafkaConnectionService.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("Name of the Kafka Topic to which the Processor 
publishes Kafka Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to Kafka")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Publish Strategy")
+            .description("The format used to publish the incoming FlowFile 
record to Kafka.")
+            .required(true)
+            .defaultValue(PublishStrategy.USE_VALUE.getValue())
+            .allowableValues(PublishStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Message Key Field")
+            .description("The name of a field in the Input Records that should 
be used as the Key for the Kafka message.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                    + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                    + "To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter, depending on your OS.")
+            .build();
+
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("Kafka Key")
+            .description("The Key to use for the Message. "
+                    + "If not specified, the flow file attribute 'kafka.key' 
is used as the message key, if it is present."
+                    + "Beware that setting Kafka key and demarcating at the 
same time may potentially lead to many Kafka messages with the same key."
+                    + "Normally this is not a problem as Kafka does not 
enforce or assume message and key uniqueness. Still, setting the demarcator and 
Kafka key at the same time poses a risk of "
+                    + "data loss on Kafka. During a topic compaction on Kafka, 
messages will be deduplicated based on this key.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named 
'" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the 
value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("compression.type")
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+            .name("Attributes to Send as Headers (Regex)")
+            .description("A Regular Expression that is matched against all 
FlowFile attribute names. "
+                    + "Any attribute whose name matches the regex will be 
added to the Kafka messages as a Header. "
+                    + "If not specified, no FlowFile attributes will be added 
as headers.")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor TRANSACTIONS_ENABLED = new 
PropertyDescriptor.Builder()
+            .name("Transactions Enabled")
+            .description("Specifies whether or not NiFi should provide 
Transactional guarantees when communicating with Kafka. If there is a problem 
sending data to Kafka, "
+                    + "and this property is set to false, then the messages 
that have already been sent to Kafka will continue on and be delivered to 
consumers. "
+                    + "If this is set to true, then the Kafka transaction will 
be rolled back so that those messages are not available to consumers. Setting 
this to true "
+                    + "requires that the [Delivery Guarantee] property be set 
to [Guarantee Replicated Delivery.]")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("Transactional Id Prefix")

Review Comment:
   ```suggestion
               .name("Transactional ID Prefix")
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/config/DeliveryGuarantee.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.config;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+public class DeliveryGuarantee {
+
+    // 
https://github.com/apache/kafka/blob/5fa48214448ddf19270a35f1dd5156a4eece4ca7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L117
+    public static final String ACKS_CONFIG = "acks";
+
+    public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+            "FlowFile will be routed to failure unless the message is 
replicated to the appropriate "
+                    + "number of Kafka Nodes according to the Topic 
configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery",
+            "FlowFile will be routed to success if the message is received by 
a single Kafka node, "
+                    + "whether or not it is replicated. This is faster than 
<Guarantee Replicated Delivery> "
+                    + "but can result in data loss if a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort",
+            "FlowFile will be routed to success after successfully sending the 
content to a Kafka node, "
+                    + "without waiting for any acknowledgment from the node at 
all. This provides the best performance but may result in data loss.");

Review Comment:
   Recommend refactoring these values to an `enum`



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaProducerWrapper.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer.txn;

Review Comment:
   Recommend spelling out the name `transaction`:
   ```suggestion
   package org.apache.nifi.kafka.service.producer.transaction;
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as either a message 
or as individual records to Apache Kafka using the Kafka Producer API. "
+        + "The messages to send may be individual FlowFiles, may be delimited 
using a "
+        + "user-specified delimiter (such as a new-line), or "
+        + "may be record-oriented data that can be read by the configured 
Record Reader. "
+        + "The complementary NiFi processor for fetching messages is 
ConsumeKafka.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, 
description = "If this attribute is set to 'true', if the processor is not 
configured "
+        + "with a demarcator and if the FlowFile's content is null, then a 
tombstone message with zero bytes will be sent to Kafka.")
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
+        + "FlowFiles that are routed to success.")
+@SeeAlso({ConsumeKafka.class})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {
+    protected static final String MSG_COUNT = "msg.count";
+
+    public static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Kafka Connection Service")
+            .description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+            .identifiesControllerService(KafkaConnectionService.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TOPIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("Topic Name")
+            .description("Name of the Kafka Topic to which the Processor 
publishes Kafka Records")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .description("The Record Reader to use for incoming FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .description("The Record Writer to use in order to serialize the 
data before sending to Kafka")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Publish Strategy")
+            .description("The format used to publish the incoming FlowFile 
record to Kafka.")
+            .required(true)
+            .defaultValue(PublishStrategy.USE_VALUE.getValue())
+            .allowableValues(PublishStrategy.class)
+            .build();
+
+    public static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+            .name("Message Key Field")
+            .description("The name of a field in the Input Records that should 
be used as the Key for the Kafka message.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new 
PropertyDescriptor.Builder()
+            .name("Message Demarcator")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .description("Specifies the string (interpreted as UTF-8) to use 
for demarcating multiple messages within "
+                    + "a single FlowFile. If not specified, the entire content 
of the FlowFile will be used as a single message. If specified, the "
+                    + "contents of the FlowFile will be split on this 
delimiter and each section sent as a separate Kafka message. "
+                    + "To enter special character such as 'new line' use 
CTRL+Enter or Shift+Enter, depending on your OS.")
+            .build();
+
+    public static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Request Size")
+            .description("The maximum size of a request in bytes. Corresponds 
to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .build();
+
+    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+            .name("Kafka Key")
+            .description("The Key to use for the Message. "
+                    + "If not specified, the flow file attribute 'kafka.key' 
is used as the message key, if it is present."
+                    + "Beware that setting Kafka key and demarcating at the 
same time may potentially lead to many Kafka messages with the same key."
+                    + "Normally this is not a problem as Kafka does not 
enforce or assume message and key uniqueness. Still, setting the demarcator and 
Kafka key at the same time poses a risk of "
+                    + "data loss on Kafka. During a topic compaction on Kafka, 
messages will be deduplicated based on this key.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Key Attribute Encoding")
+            .description("FlowFiles that are emitted have an attribute named 
'" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the 
value of the attribute should be encoded.")
+            .required(true)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
+            .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("compression.type")
+            .displayName("Compression Type")
+            .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues("none", "gzip", "snappy", "lz4")
+            .defaultValue("none")
+            .build();
+
+    public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+            .name("Attributes to Send as Headers (Regex)")
+            .description("A Regular Expression that is matched against all 
FlowFile attribute names. "
+                    + "Any attribute whose name matches the regex will be 
added to the Kafka messages as a Header. "
+                    + "If not specified, no FlowFile attributes will be added 
as headers.")
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue())
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor TRANSACTIONS_ENABLED = new 
PropertyDescriptor.Builder()
+            .name("Transactions Enabled")
+            .description("Specifies whether or not NiFi should provide 
Transactional guarantees when communicating with Kafka. If there is a problem 
sending data to Kafka, "
+                    + "and this property is set to false, then the messages 
that have already been sent to Kafka will continue on and be delivered to 
consumers. "
+                    + "If this is set to true, then the Kafka transaction will 
be rolled back so that those messages are not available to consumers. Setting 
this to true "
+                    + "requires that the [Delivery Guarantee] property be set 
to [Guarantee Replicated Delivery.]")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .required(true)
+            .build();
+    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("Transactional Id Prefix")
+            .description("When [Transactions Enabled] is set to true, 
KafkaProducer config 'transactional.id' will be a generated UUID and will be 
prefixed with this string.")
+            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .dependsOn(TRANSACTIONS_ENABLED, "true")
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("Message Header Encoding")
+            .description("For any attribute that is added as a message header, 
as configured via the <Attributes to Send as Headers> property, "
+                    + "this property indicates the Character Encoding to use 
for serializing the headers.")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue("UTF-8")
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor RECORD_KEY_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Key Writer")
+            .description("The Record Key Writer to use for outgoing FlowFiles")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .dependsOn(PUBLISH_STRATEGY, 
PublishStrategy.USE_WRAPPER.getValue())
+            .build();
+
+    public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Record Metadata Strategy")
+            .description("Specifies whether the Record's metadata (topic and 
partition) should come from the Record's metadata field or if it should come 
from the configured " +
+                    "Topic Name and Partition / Partitioner class properties")
+            .required(true)
+            .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES.getValue())
+            .allowableValues(RecordMetadataStrategy.class)
+            .dependsOn(PUBLISH_STRATEGY, 
PublishStrategy.USE_WRAPPER.getValue())
+            .build();
+
+    static final AllowableValue ROUND_ROBIN_PARTITIONING = new 
AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner",
+            "RoundRobinPartitioner",
+            "Messages will be assigned partitions in a round-robin fashion, 
sending the first message to Partition 1, "
+                    + "the next Partition to Partition 2, and so on, wrapping 
as necessary.");
+    static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+            "DefaultPartitioner", "The default partitioning strategy will 
choose the sticky partition that changes when the batch is full "
+            + "(See KIP-480 for details about sticky partitioning).");
+    public static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.ExpressionLanguagePartitioner",
+            "Expression Language Partitioner",
+            "Interprets the <Partition> property as Expression Language that 
will be evaluated against each FlowFile. This Expression will be evaluated once 
against the FlowFile, " +
+                    "so all Records in a given FlowFile will go to the same 
partition.");
+
+    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
+            .name("partitioner.class")
+            .displayName("Partitioner class")
+            .description("Specifies which class to use to compute a partition 
id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
EXPRESSION_LANGUAGE_PARTITIONING)
+            .defaultValue(RANDOM_PARTITIONING.getValue())
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+            .name("partition")
+            .displayName("Partition")
+            .description("Specifies which Partition Records will go to. How 
this value is interpreted is dictated by the <Partitioner class> property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    private static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            CONNECTION_SERVICE,
+            DeliveryGuarantee.DELIVERY_GUARANTEE,
+            TOPIC_NAME,
+            RECORD_READER,
+            RECORD_WRITER,
+            PUBLISH_STRATEGY,
+            RECORD_KEY_WRITER,
+            RECORD_METADATA_STRATEGY,
+            MESSAGE_DEMARCATOR,
+            FAILURE_STRATEGY,
+            KEY,
+            KEY_ATTRIBUTE_ENCODING,
+            ATTRIBUTE_NAME_REGEX,
+            TRANSACTIONS_ENABLED,
+            TRANSACTIONAL_ID_PREFIX,
+            MESSAGE_HEADER_ENCODING,
+            MESSAGE_KEY_FIELD,
+            MAX_REQUEST_SIZE,
+            COMPRESSION_CODEC,
+            PARTITION_CLASS,
+            PARTITION
+    ));
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles for which all content was sent to Kafka.")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be 
routed to this Relationship")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

Review Comment:
   ```suggestion
       private static final Set<Relationship> RELATIONSHIPS = 
Set.of(REL_SUCCESS, REL_FAILURE);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to