exceptionfactory commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1643159297
########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +public class Kafka3ProducerService implements KafkaProducerService { + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final Producer<byte[], byte[]> producer; + private final List<ProducerCallback> callbacks; + + private final ServiceConfiguration serviceConfiguration; + + private final KafkaProducerWrapper wrapper; + + public Kafka3ProducerService(final Properties properties, + final ServiceConfiguration serviceConfiguration, + final ProducerConfiguration producerConfiguration) { + final ByteArraySerializer serializer = new ByteArraySerializer(); + this.producer = new KafkaProducer<>(properties, serializer, serializer); + this.callbacks = new ArrayList<>(); + + this.serviceConfiguration = serviceConfiguration; + + this.wrapper = producerConfiguration.getTransactionsEnabled() + ? new KafkaTransactionalProducerWrapper(producer) + : new KafkaNonTransactionalProducerWrapper(producer); + } + + @Override + public void close() { + producer.close(); + } + + @Override + public void init() { + wrapper.init(); + } + + @Override + public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext) { + final ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile()); + callbacks.add(callback); + Optional.ofNullable(publishContext.getException()).ifPresent(e -> callback.getExceptions().add(e)); + if (callback.getExceptions().isEmpty()) { + try { + wrapper.send(kafkaRecords, publishContext, callback); + logger.trace("send():inFlight"); + } catch (final UncheckedIOException e) { + callback.getExceptions().add(e); + logger.trace("send():{}}", e.getMessage()); Review Comment: Recommend removing. ```suggestion ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +public class Kafka3ProducerService implements KafkaProducerService { + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final Producer<byte[], byte[]> producer; + private final List<ProducerCallback> callbacks; + + private final ServiceConfiguration serviceConfiguration; + + private final KafkaProducerWrapper wrapper; + + public Kafka3ProducerService(final Properties properties, + final ServiceConfiguration serviceConfiguration, + final ProducerConfiguration producerConfiguration) { + final ByteArraySerializer serializer = new ByteArraySerializer(); + this.producer = new KafkaProducer<>(properties, serializer, serializer); + this.callbacks = new ArrayList<>(); + + this.serviceConfiguration = serviceConfiguration; + + this.wrapper = producerConfiguration.getTransactionsEnabled() + ? new KafkaTransactionalProducerWrapper(producer) + : new KafkaNonTransactionalProducerWrapper(producer); + } + + @Override + public void close() { + producer.close(); + } + + @Override + public void init() { + wrapper.init(); + } + + @Override + public void send(final Iterator<KafkaRecord> kafkaRecords, final PublishContext publishContext) { + final ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile()); + callbacks.add(callback); + Optional.ofNullable(publishContext.getException()).ifPresent(e -> callback.getExceptions().add(e)); + if (callback.getExceptions().isEmpty()) { + try { + wrapper.send(kafkaRecords, publishContext, callback); + logger.trace("send():inFlight"); Review Comment: Since this has no other context, recommend removing. ```suggestion ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/consume/ConsumeKafkaConfigureTest.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.consume; + +import org.apache.nifi.kafka.processors.ConsumeKafka; +import org.apache.nifi.kafka.service.Kafka3ConnectionService; +import org.apache.nifi.kafka.shared.property.KafkaClientProperty; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.lang.reflect.Field; +import java.util.Properties; +import java.util.stream.Stream; + +class ConsumeKafkaConfigureTest { + + private static final String CONNECTION_SERVICE_ID = Kafka3ConnectionService.class.getSimpleName(); + private static final String CONFIG_BOOTSTRAP_SERVERS = "PLAINTEXT://localhost:50000"; + private static final String CONFIG_MAX_POLL_RECORDS = "1234"; + + @Test + void testConfigureMaxPollRecords() throws InitializationException, NoSuchFieldException, IllegalAccessException { + final Kafka3ConnectionService connectionService = new Kafka3ConnectionService(); + final Class<?> serviceClass = connectionService.getClass(); + + final TestRunner runner = TestRunners.newTestRunner(ConsumeKafka.class); + runner.addControllerService(CONNECTION_SERVICE_ID, connectionService); + runner.setProperty(connectionService, Kafka3ConnectionService.BOOTSTRAP_SERVERS, CONFIG_BOOTSTRAP_SERVERS); + runner.setProperty(connectionService, Kafka3ConnectionService.MAX_POLL_RECORDS, CONFIG_MAX_POLL_RECORDS); + runner.enableControllerService(connectionService); + + final Field field = serviceClass.getDeclaredField("clientProperties"); // properties used to init ConsumerService + field.setAccessible(true); Review Comment: Using reflection with this approach is generally dangerous for tests because it is subject to breaking with unclear reasons for failure. Since this test class is focused on configuration, I recommend removing it entirely. The variety of other tests seem to provide sufficient coverage. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordStreamKafkaRecordConverter.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.convert; + +import org.apache.nifi.kafka.processors.producer.common.ProducerUtils; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.value.RecordValueFactory; +import org.apache.nifi.kafka.processors.producer.value.ValueFactory; +import org.apache.nifi.kafka.service.api.header.RecordHeader; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * {@link KafkaRecordConverter} implementation for transforming NiFi + * {@link org.apache.nifi.serialization.record.Record} objects to {@link KafkaRecord} for publish to + * Kafka. + */ +public class RecordStreamKafkaRecordConverter implements KafkaRecordConverter { + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final HeadersFactory headersFactory; + private final KeyFactory keyFactory; + private final int maxMessageSize; + private final ComponentLog logger; + + public RecordStreamKafkaRecordConverter( + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final HeadersFactory headersFactory, + final KeyFactory keyFactory, + final int maxMessageSize, + final ComponentLog logger) { + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.headersFactory = headersFactory; + this.keyFactory = keyFactory; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + } + + @Override + public Iterator<KafkaRecord> convert( + final Map<String, String> attributes, final InputStream in, final long inputLength) + throws IOException { + try { + final RecordReader reader = readerFactory.createRecordReader(attributes, in, inputLength, logger); + final RecordSet recordSet = reader.createRecordSet(); + final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); + + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + final RecordSetWriter writer = writerFactory.createWriter(logger, schema, os, attributes); + final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); + return toKafkaRecordIterator(attributes, os, writer, pushBackRecordSet); + } catch (MalformedRecordException | SchemaNotFoundException e) { + throw new IOException("Stream to Record conversion failed", e); + } + } + + private Iterator<KafkaRecord> toKafkaRecordIterator( + final Map<String, String> attributes, + final ByteArrayOutputStream os, + final RecordSetWriter writer, + final PushBackRecordSet pushBackRecordSet) throws IOException { + final ValueFactory valueFactory = new RecordValueFactory(os, writer); + final List<RecordHeader> headers = headersFactory.getHeaders(attributes); + + return new Iterator<>() { + @Override + public boolean hasNext() { + try { + return pushBackRecordSet.isAnotherRecord(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public KafkaRecord next() { + try { + final Record record = pushBackRecordSet.next(); + final byte[] key = keyFactory.getKey(attributes, record); + final byte[] value = valueFactory.getValue(record); + ProducerUtils.checkMessageSize(maxMessageSize, value.length); + return new KafkaRecord(null, null, null, key, value, headers); + } catch (final IOException e) { + throw new UncheckedIOException(e); Review Comment: A message should be included: ```suggestion throw new UncheckedIOException("Record conversion failed", e); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.convert; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.ProducerUtils; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordFieldConverter; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord; +import org.apache.nifi.kafka.service.api.header.RecordHeader; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * {@link KafkaRecordConverter} implementation for transforming NiFi + * {@link org.apache.nifi.serialization.record.Record} objects to {@link KafkaRecord} for publish to + * Kafka. + */ +public class RecordWrapperStreamKafkaRecordConverter implements KafkaRecordConverter { + private final FlowFile flowFile; + private final RecordMetadataStrategy metadataStrategy; + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final RecordSetWriterFactory keyWriterFactory; + private final int maxMessageSize; + private final ComponentLog logger; + + public RecordWrapperStreamKafkaRecordConverter( + final FlowFile flowFile, + final RecordMetadataStrategy metadataStrategy, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final RecordSetWriterFactory keyWriterFactory, + final int maxMessageSize, + final ComponentLog logger) { + this.flowFile = flowFile; + this.metadataStrategy = metadataStrategy; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.keyWriterFactory = keyWriterFactory; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + } + + @Override + public Iterator<KafkaRecord> convert( + final Map<String, String> attributes, final InputStream in, final long inputLength) + throws IOException { + try { + final RecordReader reader = readerFactory.createRecordReader(attributes, in, inputLength, logger); + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); + return toKafkaRecordIterator(pushBackRecordSet); + } catch (MalformedRecordException | SchemaNotFoundException e) { + throw new IOException("Stream to Record conversion failed", e); + } + } + + private Iterator<KafkaRecord> toKafkaRecordIterator( + final PushBackRecordSet pushBackRecordSet) { + return new Iterator<>() { + @Override + public boolean hasNext() { + try { + return pushBackRecordSet.isAnotherRecord(); + } catch (final IOException e) { + throw new UncheckedIOException(e); Review Comment: A message should be included: ```suggestion throw new UncheckedIOException("Record evaluation failed", e); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.convert; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.ProducerUtils; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordFieldConverter; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.processors.producer.wrapper.WrapperRecord; +import org.apache.nifi.kafka.service.api.header.RecordHeader; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.PushBackRecordSet; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * {@link KafkaRecordConverter} implementation for transforming NiFi + * {@link org.apache.nifi.serialization.record.Record} objects to {@link KafkaRecord} for publish to + * Kafka. + */ +public class RecordWrapperStreamKafkaRecordConverter implements KafkaRecordConverter { + private final FlowFile flowFile; + private final RecordMetadataStrategy metadataStrategy; + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final RecordSetWriterFactory keyWriterFactory; + private final int maxMessageSize; + private final ComponentLog logger; + + public RecordWrapperStreamKafkaRecordConverter( + final FlowFile flowFile, + final RecordMetadataStrategy metadataStrategy, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final RecordSetWriterFactory keyWriterFactory, + final int maxMessageSize, + final ComponentLog logger) { + this.flowFile = flowFile; + this.metadataStrategy = metadataStrategy; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.keyWriterFactory = keyWriterFactory; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + } + + @Override + public Iterator<KafkaRecord> convert( + final Map<String, String> attributes, final InputStream in, final long inputLength) + throws IOException { + try { + final RecordReader reader = readerFactory.createRecordReader(attributes, in, inputLength, logger); + final RecordSet recordSet = reader.createRecordSet(); + final PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet); + return toKafkaRecordIterator(pushBackRecordSet); + } catch (MalformedRecordException | SchemaNotFoundException e) { + throw new IOException("Stream to Record conversion failed", e); + } + } + + private Iterator<KafkaRecord> toKafkaRecordIterator( + final PushBackRecordSet pushBackRecordSet) { + return new Iterator<>() { + @Override + public boolean hasNext() { + try { + return pushBackRecordSet.isAnotherRecord(); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public KafkaRecord next() { + try { + final Record record = pushBackRecordSet.next(); + final RecordFieldConverter converter = new RecordFieldConverter(record, flowFile, logger); + final byte[] key = converter.toBytes(WrapperRecord.KEY, keyWriterFactory); + final byte[] value = converter.toBytes(WrapperRecord.VALUE, writerFactory); + ProducerUtils.checkMessageSize(maxMessageSize, value.length); + + final List<RecordHeader> headers = getKafkaHeaders(record); + + // wrapper record may specify custom topic / partition + String topic = null; + Integer partition = null; + if (metadataStrategy == RecordMetadataStrategy.FROM_RECORD) { + final MapRecord myMetadataRecord = (MapRecord) record.getValue(WrapperRecord.METADATA); + topic = myMetadataRecord.getAsString(WrapperRecord.TOPIC); + partition = myMetadataRecord.getAsInt(WrapperRecord.PARTITION); + } + + return new KafkaRecord(topic, partition, null, key, value, headers); + } catch (final IOException e) { + throw new UncheckedIOException(e); Review Comment: ```suggestion throw new UncheckedIOException("Record conversion failed", e); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/wrapper/RecordFieldConverter.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.wrapper; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class RecordFieldConverter { + private final Record record; + private final FlowFile flowFile; + private final ComponentLog logger; + + public RecordFieldConverter(final Record record, final FlowFile flowFile, final ComponentLog logger) { + this.record = record; + this.flowFile = flowFile; + this.logger = logger; + } + + public byte[] toBytes(final String fieldName, final RecordSetWriterFactory writerFactory) throws IOException { + final byte[] bytes; + + try { + final Object field = record.getValue(fieldName); + if (field == null) { + bytes = null; + } else if (field instanceof Record) { + bytes = toBytes((Record) field, writerFactory); + } else if (field instanceof Byte[]) { + bytes = toBytes((Byte[]) field); + } else if (field instanceof Object[]) { + bytes = toBytes((Object[]) field); + } else if (field instanceof String) { + bytes = toBytes((String) field); + } else { + throw new MalformedRecordException(String.format("Couldn't convert %s record data to byte array.", fieldName)); Review Comment: Recommend avoiding contractions and periods in messages: ```suggestion throw new MalformedRecordException(String.format("Failed to convert [%s] record data to byte array", fieldName)); ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ########## @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. " + + "The messages to send may be individual FlowFiles, may be delimited using a " + + "user-specified delimiter (such as a new-line), or " + + "may be record-oriented data that can be read by the configured Record Reader. " + + "The complementary NiFi processor for fetching messages is ConsumeKafka.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured " + + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({ConsumeKafka.class}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { + protected static final String MSG_COUNT = "msg.count"; + + public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Kafka Connection Service") + .description("Provides connections to Kafka Broker for publishing Kafka Records") + .identifiesControllerService(KafkaConnectionService.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("Name of the Kafka Topic to which the Processor publishes Kafka Records") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() + .name("Publish Strategy") + .description("The format used to publish the incoming FlowFile record to Kafka.") + .required(true) + .defaultValue(PublishStrategy.USE_VALUE.getValue()) + .allowableValues(PublishStrategy.class) + .build(); + + public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(KeyEncoding.UTF8.getValue()) + .allowableValues(KeyEncoding.class) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("Attributes to Send as Headers (Regex)") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() + .name("Transactions Enabled") + .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder() + .name("Transactional Id Prefix") + .description("When [Transactions Enabled] is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .dependsOn(TRANSACTIONS_ENABLED, "true") + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("Message Header Encoding") + .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, " + + "this property indicates the Character Encoding to use for serializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + + static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder() + .name("Record Key Writer") + .description("The Record Key Writer to use for outgoing FlowFiles") + .identifiesControllerService(RecordSetWriterFactory.class) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .build(); + + public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder() + .name("Record Metadata Strategy") + .description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured " + + "Topic Name and Partition / Partitioner class properties") + .required(true) + .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES.getValue()) + .allowableValues(RecordMetadataStrategy.class) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .build(); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner", + "RoundRobinPartitioner", + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "The default partitioning strategy will choose the sticky partition that changes when the batch is full " + + "(See KIP-480 for details about sticky partitioning)."); + public static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.ExpressionLanguagePartitioner", Review Comment: Is there a reason this is public versus of the others? Recommend moving all values to a separate `enum`. ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ########## @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. " + + "The messages to send may be individual FlowFiles, may be delimited using a " + + "user-specified delimiter (such as a new-line), or " + + "may be record-oriented data that can be read by the configured Record Reader. " + + "The complementary NiFi processor for fetching messages is ConsumeKafka.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured " + + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({ConsumeKafka.class}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { + protected static final String MSG_COUNT = "msg.count"; + + public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Kafka Connection Service") + .description("Provides connections to Kafka Broker for publishing Kafka Records") + .identifiesControllerService(KafkaConnectionService.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("Name of the Kafka Topic to which the Processor publishes Kafka Records") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() + .name("Publish Strategy") + .description("The format used to publish the incoming FlowFile record to Kafka.") + .required(true) + .defaultValue(PublishStrategy.USE_VALUE.getValue()) + .allowableValues(PublishStrategy.class) + .build(); + + public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(KeyEncoding.UTF8.getValue()) + .allowableValues(KeyEncoding.class) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("Attributes to Send as Headers (Regex)") Review Comment: The parenthetical should be removed from the name. What do you think of an alternative name? ```suggestion .name("FlowFile Attribute Header Pattern") ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ########## @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. " + + "The messages to send may be individual FlowFiles, may be delimited using a " + + "user-specified delimiter (such as a new-line), or " + + "may be record-oriented data that can be read by the configured Record Reader. " + + "The complementary NiFi processor for fetching messages is ConsumeKafka.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured " + + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({ConsumeKafka.class}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { + protected static final String MSG_COUNT = "msg.count"; + + public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Kafka Connection Service") + .description("Provides connections to Kafka Broker for publishing Kafka Records") + .identifiesControllerService(KafkaConnectionService.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("Name of the Kafka Topic to which the Processor publishes Kafka Records") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() + .name("Publish Strategy") + .description("The format used to publish the incoming FlowFile record to Kafka.") + .required(true) + .defaultValue(PublishStrategy.USE_VALUE.getValue()) + .allowableValues(PublishStrategy.class) + .build(); + + public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(KeyEncoding.UTF8.getValue()) + .allowableValues(KeyEncoding.class) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("Attributes to Send as Headers (Regex)") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() + .name("Transactions Enabled") + .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder() + .name("Transactional Id Prefix") Review Comment: ```suggestion .name("Transactional ID Prefix") ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/config/DeliveryGuarantee.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.config; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; + +public class DeliveryGuarantee { + + // https://github.com/apache/kafka/blob/5fa48214448ddf19270a35f1dd5156a4eece4ca7/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L117 + public static final String ACKS_CONFIG = "acks"; + + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully sending the content to a Kafka node, " + + "without waiting for any acknowledgment from the node at all. This provides the best performance but may result in data loss."); Review Comment: Recommend refactoring these values to an `enum` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaProducerWrapper.java: ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer.txn; Review Comment: Recommend spelling out the name `transaction`: ```suggestion package org.apache.nifi.kafka.service.producer.transaction; ``` ########## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ########## @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as either a message or as individual records to Apache Kafka using the Kafka Producer API. " + + "The messages to send may be individual FlowFiles, may be delimited using a " + + "user-specified delimiter (such as a new-line), or " + + "may be record-oriented data that can be read by the configured Record Reader. " + + "The complementary NiFi processor for fetching messages is ConsumeKafka.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured " + + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({ConsumeKafka.class}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { + protected static final String MSG_COUNT = "msg.count"; + + public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() + .name("Kafka Connection Service") + .description("Provides connections to Kafka Broker for publishing Kafka Records") + .identifiesControllerService(KafkaConnectionService.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + + public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("Name of the Kafka Topic to which the Processor publishes Kafka Records") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() + .name("Publish Strategy") + .description("The format used to publish the incoming FlowFile record to Kafka.") + .required(true) + .defaultValue(PublishStrategy.USE_VALUE.getValue()) + .allowableValues(PublishStrategy.class) + .build(); + + public static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + + public static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(KeyEncoding.UTF8.getValue()) + .allowableValues(KeyEncoding.class) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + public static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("Attributes to Send as Headers (Regex)") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_VALUE.getValue()) + .required(false) + .build(); + + public static final PropertyDescriptor TRANSACTIONS_ENABLED = new PropertyDescriptor.Builder() + .name("Transactions Enabled") + .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the [Delivery Guarantee] property be set to [Guarantee Replicated Delivery.]") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder() + .name("Transactional Id Prefix") + .description("When [Transactions Enabled] is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.") + .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .dependsOn(TRANSACTIONS_ENABLED, "true") + .required(false) + .build(); + + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("Message Header Encoding") + .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, " + + "this property indicates the Character Encoding to use for serializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + + static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder() + .name("Record Key Writer") + .description("The Record Key Writer to use for outgoing FlowFiles") + .identifiesControllerService(RecordSetWriterFactory.class) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .build(); + + public static final PropertyDescriptor RECORD_METADATA_STRATEGY = new PropertyDescriptor.Builder() + .name("Record Metadata Strategy") + .description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured " + + "Topic Name and Partition / Partitioner class properties") + .required(true) + .defaultValue(RecordMetadataStrategy.FROM_PROPERTIES.getValue()) + .allowableValues(RecordMetadataStrategy.class) + .dependsOn(PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.getValue()) + .build(); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner", + "RoundRobinPartitioner", + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "The default partitioning strategy will choose the sticky partition that changes when the batch is full " + + "(See KIP-480 for details about sticky partitioning)."); + public static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue("org.apache.nifi.processors.kafka.pubsub.Partitioners.ExpressionLanguagePartitioner", + "Expression Language Partitioner", + "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " + + "so all Records in a given FlowFile will go to the same partition."); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name("partitioner.class") + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + .name("partition") + .displayName("Partition") + .description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList( + CONNECTION_SERVICE, + DeliveryGuarantee.DELIVERY_GUARANTEE, + TOPIC_NAME, + RECORD_READER, + RECORD_WRITER, + PUBLISH_STRATEGY, + RECORD_KEY_WRITER, + RECORD_METADATA_STRATEGY, + MESSAGE_DEMARCATOR, + FAILURE_STRATEGY, + KEY, + KEY_ATTRIBUTE_ENCODING, + ATTRIBUTE_NAME_REGEX, + TRANSACTIONS_ENABLED, + TRANSACTIONAL_ID_PREFIX, + MESSAGE_HEADER_ENCODING, + MESSAGE_KEY_FIELD, + MAX_REQUEST_SIZE, + COMPRESSION_CODEC, + PARTITION_CLASS, + PARTITION + )); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); Review Comment: ```suggestion private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
