greyp9 commented on code in PR #9604:
URL: https://github.com/apache/nifi/pull/9604#discussion_r1901992296
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/convert/RecordWrapperStreamKafkaRecordConverter.java:
##########
@@ -107,7 +107,10 @@ public KafkaRecord next() {
final RecordFieldConverter converter = new
RecordFieldConverter(record, flowFile, logger);
final byte[] key = converter.toBytes(WrapperRecord.KEY,
keyWriterFactory);
final byte[] value =
converter.toBytes(WrapperRecord.VALUE, writerFactory);
- ProducerUtils.checkMessageSize(maxMessageSize,
value.length);
+
+ if (value != null) {
Review Comment:
For the RecordWrapper strategy, there needs to be some logic at this point
to do the evaluation of value and the FlowFile attribute "kafka.tombstone".
One option is to implement a utility method to do things inline at line 109,
something like:
```
private byte[] toValue(final byte[] bytes, final FlowFile flowFile) {
final String tombstone =
flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE);
final boolean isTombstone = (Boolean.TRUE.toString().equals(tombstone)
&& (flowFile.getSize() == 0));
return isTombstone ? null : bytes;
}
```
This is surfaced by adding the zero byte value test case described in this
review.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/PublishKafkaWrapperX6IT.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.publish.additional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.nifi.kafka.processors.AbstractPublishKafkaIT;
+import org.apache.nifi.kafka.processors.PublishKafka;
+import
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class PublishKafkaWrapperX6IT extends AbstractPublishKafkaIT {
+ private static final String TEST_TOPIC = "nifi-" +
PublishKafkaWrapperX6IT.class.getName();
+ private static final String OVERRIDE_TOPIC = "topic1-" +
PublishKafkaWrapperX6IT.class.getName();
+ private static final Integer TEST_PARTITION = new Random().nextInt(3) - 1;
+ private static final Integer OVERRIDE_PARTITION = 1;
+
+ private static final String TEST_RESOURCE =
"org/apache/nifi/kafka/processors/publish/additional/wrapperX6.json";
+
+ @BeforeAll
+ protected static void beforeAll() {
+ AbstractPublishKafkaIT.beforeAll();
+
+ final Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ try (final Admin adminClient = Admin.create(properties)) {
+ final int numPartitions = 3;
+ final short replicationFactor = 1;
+ final NewTopic testTopic = new NewTopic(TEST_TOPIC, numPartitions,
replicationFactor);
+ final NewTopic overrideTopic = new NewTopic(OVERRIDE_TOPIC,
numPartitions, replicationFactor);
+ final CreateTopicsResult topics =
adminClient.createTopics(Arrays.asList(testTopic, overrideTopic));
+ final KafkaFuture<Void> testTopicFuture =
topics.values().get(TEST_TOPIC);
+ final KafkaFuture<Void> overrideTopicFuture =
topics.values().get(OVERRIDE_TOPIC);
+ testTopicFuture.get(5, TimeUnit.SECONDS);
+ overrideTopicFuture.get(5, TimeUnit.SECONDS);
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void test1ProduceOneFlowFile() throws InitializationException,
IOException {
+ final TestRunner runner =
TestRunners.newTestRunner(PublishKafka.class);
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(PublishKafka.CONNECTION_SERVICE,
addKafkaConnectionService(runner));
+ addRecordReaderService(runner);
+ addRecordWriterService(runner);
+ addRecordKeyWriterService(runner);
+
+ runner.setProperty(PublishKafka.TOPIC_NAME, TEST_TOPIC);
+ runner.setProperty(PublishKafka.PARTITION,
Integer.toString(TEST_PARTITION));
+ runner.getLogger().info("partition={}", TEST_PARTITION);
+ runner.setProperty(PublishKafka.PUBLISH_STRATEGY,
PublishStrategy.USE_WRAPPER.name());
+ runner.setProperty(PublishKafka.RECORD_METADATA_STRATEGY,
RecordMetadataStrategy.FROM_RECORD.getValue());
+
+ final Map<String, String> attributes = new HashMap<>();
+ final byte[] bytesFlowFileTemplate =
IOUtils.toByteArray(Objects.requireNonNull(
+ getClass().getClassLoader().getResource(TEST_RESOURCE)));
+ final byte[] bytesFlowFile = new String(bytesFlowFileTemplate,
StandardCharsets.UTF_8)
+ .replace("topic1",
OVERRIDE_TOPIC).getBytes(StandardCharsets.UTF_8);
+ runner.enqueue(bytesFlowFile, attributes);
+ runner.run(1);
+ runner.assertAllFlowFilesTransferred("success", 1);
+ }
+
+ @Test
+ public void test2ConsumeOneRecord() throws IOException {
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(getKafkaConsumerProperties())) {
+ consumer.subscribe(Arrays.asList(TEST_TOPIC, OVERRIDE_TOPIC));
+ final ConsumerRecords<String, String> records =
consumer.poll(DURATION_POLL);
+ assertEquals(1, records.count());
Review Comment:
Here is where we would expect three, after the adjustment of the test
resource.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java:
##########
@@ -185,14 +185,20 @@ public ByteRecord next() {
recordHeaders.add(recordHeader);
});
+ // NIFI-14122: Support Kafka tombstones
Review Comment:
Code comments like this (with JIRA numbers) in the production code are
generally disfavored (though they do exist). Maybe just:
`Support Kafka tombstones`
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/resources/org/apache/nifi/kafka/processors/publish/additional/wrapperX6.json:
##########
@@ -0,0 +1,14 @@
+{
Review Comment:
I think you were on to something with the previous version of this test
resource. Three test cases should provide good coverage:
- value is empty string
- value is null
- omit value from the JSON representation of the record
(The test code would then need to be adjusted to expect three records to be
emitted from the publish part.)
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##########
@@ -92,7 +92,8 @@
+ "The messages to send may be individual FlowFiles, may be delimited
using a "
+ "user-specified delimiter (such as a new-line), or "
+ "may be record-oriented data that can be read by the configured
Record Reader. "
- + "The complementary NiFi processor for fetching messages is
ConsumeKafka.")
+ + "The complementary NiFi processor for fetching messages is
ConsumeKafka. "
+ + "To produce a kafka tombstone message while using
PublishStrategy.USE_WRAPPER, simply set the value of a record to 'null'.")
Review Comment:
The previous implementation seems to suggest the expectation that
`value=null` should only be sent when both conditions are true (zero byte,
tombstone attribute). But then it would be nice to be able to send `n` wrapper
records, where some are tombstone records and some are not. So this should be
ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]