http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html deleted file mode 100644 index 20ce03c..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html +++ /dev/null @@ -1,47 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>PublishKafka</title> - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p> - This Processors puts the contents of a FlowFile to a Topic in - <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available - with Kafka 0.9+ API. The content of a FlowFile becomes the contents of a Kafka message. - This message is optionally assigned a key by using the <Kafka Key> Property. - </p> - - <p> - The Processor allows the user to configure an optional Message Demarcator that - can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used - to indicate that the contents of the FlowFile should be used to send one message - per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator'). - If the property is not set, the entire contents of the FlowFile - will be sent as a single message. When using the demarcator, if some messages are - successfully sent but other messages fail to send, the resulting FlowFile will be - considered a failed FlowFuile and will have additional attributes to that effect. - One of such attributes is 'failed.last.idx' which indicates the index of the last message - that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1). - This will allow PublishKafka to only re-send un-ACKed messages on the next re-try. - </p> - </body> -</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java deleted file mode 100644 index d09be60..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.Closeable; -import java.lang.reflect.Field; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class AbstractKafkaProcessorLifecycleTest { - - private final static Random random = new Random(); - - @Test - public void validateBaseProperties() throws Exception { - TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); - runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); - runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); - } - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); - - runner.removeProperty(ConsumeKafka.TOPIC); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); - } - - runner.setProperty(ConsumeKafka.TOPIC, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.TOPIC, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(ConsumeKafka.TOPIC, "blah"); - - runner.removeProperty(ConsumeKafka.CLIENT_ID); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("invalid because client.id is required")); - } - - runner.setProperty(ConsumeKafka.CLIENT_ID, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.CLIENT_ID, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj"); - - runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - } - - @Test - @Ignore // just for extra sanity check - public void validateConcurrencyWithRandomFailuresMultiple() throws Exception { - for (int i = 0; i < 100; i++) { - validateConcurrencyWithRandomFailures(); - } - } - - @Test - public void validateConcurrencyWithRandomFailures() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - if (random.nextInt(10) == 5) { - when(context.getName()).thenReturn("fail"); - } - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * this has to be <= 1 since the last thread may come to finally block - * after acceptTask flag has been reset at which point the close will - * not be called (which is correct behavior since it will be invoked - * explicitly by the life-cycle operations of a running processor). - * - * You can actually observe the =1 behavior in the next test where it is - * always 0 close calls - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertTrue(closeVsInitDiff <= 1); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - @Test - public void validateConcurrencyWithAllSuccesses() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * unlike previous test this one will always be 1 since there are no - * failures - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertEquals(1, closeVsInitDiff); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - @Test - public void validateConcurrencyWithAllFailures() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - when(context.getName()).thenReturn("fail"); - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * unlike previous test this one will always be 0 since all triggers are - * failures - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertEquals(0, closeVsInitDiff); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - /** - * - */ - public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> { - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { - return true; - } - - @Override - protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - return mock(Closeable.class); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return SHARED_DESCRIPTORS; - } - } - - - public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> { - final AtomicInteger failedTriggers = new AtomicInteger(); - final AtomicInteger successfulTriggers = new AtomicInteger(); - final AtomicInteger resourceReinitialized = new AtomicInteger(); - final AtomicInteger closeCounter = new AtomicInteger(); - - ConcurrencyValidatingProcessor() { - try { - Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger"); - loggerField.setAccessible(true); - loggerField.set(this, mock(ComponentLog.class)); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - @Override - @OnStopped - public void close() { - super.close(); - assertTrue(this.kafkaResource == null); - closeCounter.incrementAndGet(); - } - - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { - assertNotNull(this.kafkaResource); - if ("fail".equals(context.getName())) { - failedTriggers.incrementAndGet(); - throw new RuntimeException("Intentional"); - } - this.successfulTriggers.incrementAndGet(); - return true; - } - - @Override - protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - this.resourceReinitialized.incrementAndGet(); - return mock(Closeable.class); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java deleted file mode 100644 index 8e17a21..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.nio.charset.StandardCharsets; -import java.util.List; - -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; - -// The test is valid and should be ran when working on this module. @Ignore is -// to speed up the overall build -public class ConsumeKafkaTest { - - @Test - public void validateCustomSerilaizerDeserializerSettings() throws Exception { - ConsumeKafka consumeKafka = new ConsumeKafka(); - TestRunner runner = TestRunners.newTestRunner(consumeKafka); - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(ConsumeKafka.TOPIC, "foo"); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); - runner.assertValid(); - runner.setProperty("key.deserializer", "Foo"); - runner.assertNotValid(); - } - - @Test - public void validatePropertiesValidation() throws Exception { - ConsumeKafka consumeKafka = new ConsumeKafka(); - TestRunner runner = TestRunners.newTestRunner(consumeKafka); - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(ConsumeKafka.TOPIC, "foo"); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - - runner.removeProperty(ConsumeKafka.GROUP_ID); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("invalid because group.id is required")); - } - - runner.setProperty(ConsumeKafka.GROUP_ID, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.GROUP_ID, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - } - - /** - * Will set auto-offset to 'smallest' to ensure that all events (the once - * that were sent before and after consumer startup) are received. - */ - @Test - public void validateGetAllMessages() throws Exception { - String topicName = "validateGetAllMessages"; - - StubConsumeKafka consumeKafka = new StubConsumeKafka(); - - final TestRunner runner = TestRunners.newTestRunner(consumeKafka); - runner.setValidateExpressionUsage(false); - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPIC, topicName); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.setProperty("check.connection", "false"); - - byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hello-2".getBytes(StandardCharsets.UTF_8), "Hello-3".getBytes(StandardCharsets.UTF_8) }; - consumeKafka.setValues(values); - - runner.run(1, false); - - values = new byte[][] { "Hello-4".getBytes(StandardCharsets.UTF_8), "Hello-5".getBytes(StandardCharsets.UTF_8), - "Hello-6".getBytes(StandardCharsets.UTF_8) }; - consumeKafka.setValues(values); - - runner.run(1, false); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); - - assertEquals(6, flowFiles.size()); - // spot check - MockFlowFile flowFile = flowFiles.get(0); - String event = new String(flowFile.toByteArray()); - assertEquals("Hello-1", event); - - flowFile = flowFiles.get(1); - event = new String(flowFile.toByteArray()); - assertEquals("Hello-2", event); - - flowFile = flowFiles.get(5); - event = new String(flowFile.toByteArray()); - assertEquals("Hello-6", event); - - consumeKafka.close(); - } - - @Test - public void validateGetAllMessagesWithProvidedDemarcator() throws Exception { - String topicName = "validateGetAllMessagesWithProvidedDemarcator"; - - StubConsumeKafka consumeKafka = new StubConsumeKafka(); - - final TestRunner runner = TestRunners.newTestRunner(consumeKafka); - runner.setValidateExpressionUsage(false); - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPIC, topicName); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - runner.setProperty(ConsumeKafka.GROUP_ID, "foo"); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah"); - runner.setProperty("check.connection", "false"); - - byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8), - "Hi-2".getBytes(StandardCharsets.UTF_8) }; - consumeKafka.setValues(values); - - runner.run(1, false); - List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); - assertEquals(1, flowFiles.size()); - - values = new byte[][] { "ÐдÑавÑÑвÑйÑе-3".getBytes(StandardCharsets.UTF_8), - "ããã«ã¡ã¯-4".getBytes(StandardCharsets.UTF_8), "Hello-5".getBytes(StandardCharsets.UTF_8) }; - consumeKafka.setValues(values); - - runner.run(1, false); - - flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); - - assertEquals(2, flowFiles.size()); - MockFlowFile flowFile = flowFiles.get(0); - String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); - assertEquals("0", flowFile.getAttribute("kafka.partition")); - assertEquals("0", flowFile.getAttribute("kafka.offset")); - assertEquals("validateGetAllMessagesWithProvidedDemarcator", flowFile.getAttribute("kafka.topic")); - - assertEquals(2, events.length); - - flowFile = flowFiles.get(1); - events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); - - assertEquals(3, events.length); - // spot check - assertEquals("ÐдÑавÑÑвÑйÑе-3", events[0]); - assertEquals("ããã«ã¡ã¯-4", events[1]); - assertEquals("Hello-5", events[2]); - - consumeKafka.close(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java deleted file mode 100644 index 6b8b042..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.kafka.clients.producer.Partitioner; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; -import org.apache.nifi.processors.kafka.test.EmbeddedKafka; -import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.ConsumerTimeoutException; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; - -public class KafkaPublisherTest { - - private static EmbeddedKafka kafkaLocal; - - private static EmbeddedKafkaProducerHelper producerHelper; - - @BeforeClass - public static void beforeClass() { - kafkaLocal = new EmbeddedKafka(); - kafkaLocal.start(); - producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); - } - - @AfterClass - public static void afterClass() throws Exception { - producerHelper.close(); - kafkaLocal.stop(); - } - - @Test - public void validateSuccessfulSendAsWhole() throws Exception { - InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8)); - String topicName = "validateSuccessfulSendAsWhole"; - - Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - KafkaPublisherResult result = publisher.publish(publishingContext); - - assertEquals(0, result.getLastMessageAcked()); - assertEquals(1, result.getMessagesSent()); - contentStream.close(); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - assertNotNull(iter.next()); - try { - iter.next(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - } - - @Test - public void validateSuccessfulSendAsDelimited() throws Exception { - InputStream contentStream = new ByteArrayInputStream( - "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8)); - String topicName = "validateSuccessfulSendAsDelimited"; - - Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - KafkaPublisherResult result = publisher.publish(publishingContext); - - assertEquals(3, result.getLastMessageAcked()); - assertEquals(4, result.getMessagesSent()); - contentStream.close(); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - assertNotNull(iter.next()); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - } - - /* - * This test simulates the condition where not all messages were ACKed by - * Kafka - */ - @Test - public void validateRetries() throws Exception { - byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); - InputStream contentStream = new ByteArrayInputStream(testValue); - String topicName = "validateSuccessfulReSendOfFailedSegments"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - // simulates the first re-try - int lastAckedMessageIndex = 1; - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - - publisher.publish(publishingContext); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - String m1 = new String(iter.next().message()); - String m2 = new String(iter.next().message()); - assertEquals("Hello Kafka3", m1); - assertEquals("Hello Kafka4", m2); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - // simulates the second re-try - lastAckedMessageIndex = 2; - contentStream = new ByteArrayInputStream(testValue); - publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(publishingContext); - - m1 = new String(iter.next().message()); - assertEquals("Hello Kafka4", m1); - - publisher.close(); - } - - /* - * Similar to the above test, but it sets the first retry index to the last - * possible message index and second index to an out of bound index. The - * expectation is that no messages will be sent to Kafka - */ - @Test - public void validateRetriesWithWrongIndex() throws Exception { - byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8); - InputStream contentStream = new ByteArrayInputStream(testValue); - String topicName = "validateRetriesWithWrongIndex"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - - // simulates the first re-try - int lastAckedMessageIndex = 3; - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - - publisher.publish(publishingContext); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - // simulates the second re-try - lastAckedMessageIndex = 6; - contentStream = new ByteArrayInputStream(testValue); - publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); - publisher.publish(publishingContext); - try { - iter.next(); - fail(); - } catch (ConsumerTimeoutException e) { - // that's OK since this is the Kafka mechanism to unblock - } - - publisher.close(); - } - - @Test - public void validateWithMultiByteCharactersNoDelimiter() throws Exception { - String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; - InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); - String topicName = "validateWithMultiByteCharacters"; - - Properties kafkaProperties = this.buildProducerProperties(); - - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - - publisher.publish(publishingContext); - publisher.close(); - - ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName); - String r = new String(iter.next().message(), StandardCharsets.UTF_8); - assertEquals(data, r); - } - - @Test - public void validateWithNonDefaultPartitioner() throws Exception { - String data = "fooandbarandbaz"; - InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); - String topicName = "validateWithNonDefaultPartitioner"; - - Properties kafkaProperties = this.buildProducerProperties(); - kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); - PublishingContext publishingContext = new PublishingContext(contentStream, topicName); - publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); - - try { - publisher.publish(publishingContext); - // partitioner should be invoked 3 times - assertTrue(TestPartitioner.counter == 3); - publisher.close(); - } finally { - TestPartitioner.counter = 0; - } - } - - private Properties buildProducerProperties() { - Properties kafkaProperties = new Properties(); - kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName()); - kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName()); - kafkaProperties.setProperty("bootstrap.servers", "localhost:" + kafkaLocal.getKafkaPort()); - kafkaProperties.put("auto.create.topics.enable", "true"); - return kafkaProperties; - } - - private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) { - Properties props = new Properties(); - props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); - props.put("group.id", "test"); - props.put("consumer.timeout.ms", "500"); - props.put("auto.offset.reset", "smallest"); - ConsumerConfig consumerConfig = new ConsumerConfig(props); - ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); - Map<String, Integer> topicCountMap = new HashMap<>(1); - topicCountMap.put(topic, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); - ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); - return iter; - } - - public static class TestPartitioner implements Partitioner { - static int counter; - - @Override - public void configure(Map<String, ?> configs) { - // nothing to do, test - } - - @Override - public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, - Cluster cluster) { - counter++; - return 0; - } - - @Override - public void close() { - counter = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java deleted file mode 100644 index 01c5fdd..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.nio.charset.StandardCharsets; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.mockito.Mockito; - -// The test is valid and should be ran when working on this module. @Ignore is -// to speed up the overall build -public class PublishKafkaTest { - - @Test - public void validateCustomSerilaizerDeserializerSettings() throws Exception { - PublishKafka publishKafka = new PublishKafka(); - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(PublishKafka.TOPIC, "foo"); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec"); - runner.setProperty("key.serializer", ByteArraySerializer.class.getName()); - runner.assertValid(); - runner.setProperty("key.serializer", "Foo"); - runner.assertNotValid(); - } - - @Test - public void validatePropertiesValidation() throws Exception { - PublishKafka publishKafka = new PublishKafka(); - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234"); - runner.setProperty(PublishKafka.TOPIC, "foo"); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "foo"); - - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid")); - } - } - - @Test - public void validateCustomValidation() { - String topicName = "validateCustomValidation"; - PublishKafka publishKafka = new PublishKafka(); - - /* - * Validates that Kerberos principle is required if one of SASL set for - * secirity protocol - */ - TestRunner runner = TestRunners.newTestRunner(publishKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.SECURITY_PROTOCOL, PublishKafka.SEC_SASL_PLAINTEXT); - try { - runner.run(); - fail(); - } catch (Throwable e) { - assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because")); - } - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateSingleCharacterDemarcatedMessages() { - String topicName = "validateSingleCharacterDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() { - String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName()); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo"); - - runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() { - String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner"; - StubPublishKafka putKafka = new StubPublishKafka(1); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName()); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo"); - - runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(7)).send(Mockito.any(ProducerRecord.class)); - - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnSendFailureAndThenResendSuccessA() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis"); - - final String text = "Hello World\nGoodbye\nfail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - putKafka.destroy(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnSendFailureAndThenResendSuccessB() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(1); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); - - final String text = "Hello World\nGoodbye\nfail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); - - final String text = "futurefail\nHello World\nGoodbye\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); - assertNotNull(ff); - runner.enqueue(ff); - - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - // 6 sends due to duplication - verify(producer, times(5)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { - String topicName = "validateSendFailureAndThenResendSuccess"; - StubPublishKafka putKafka = new StubPublishKafka(100); - - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); - - final String text = "Hello World\nGoodbye\nfuturefail\n2"; - runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); - assertNotNull(ff); - runner.enqueue(ff); - - runner.run(1, false); - assertEquals(0, runner.getQueueSize().getObjectCount()); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - // 6 sends due to duplication - verify(producer, times(6)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateDemarcationIntoEmptyMessages() { - String topicName = "validateDemarcationIntoEmptyMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - final TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.KEY, "key1"); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8); - runner.enqueue(bytes); - runner.run(1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexRightPartialDemarcatedMessages() { - String topicName = "validateComplexRightPartialDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(3)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexLeftPartialDemarcatedMessages() { - String topicName = "validateComplexLeftPartialDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDSTUFFå >å I Mean IT!å <å WILDSTUFFå >å <å WILDSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } - - @SuppressWarnings("unchecked") - @Test - public void validateComplexPartialMatchDemarcatedMessages() { - String topicName = "validateComplexPartialMatchDemarcatedMessages"; - StubPublishKafka putKafka = new StubPublishKafka(100); - TestRunner runner = TestRunners.newTestRunner(putKafka); - runner.setProperty(PublishKafka.TOPIC, topicName); - runner.setProperty(PublishKafka.CLIENT_ID, "foo"); - runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); - runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "å <å WILDSTUFFå >å "); - - runner.enqueue("Hello Worldå <å WILDSTUFFå >å Goodbyeå <å WILDBOOMSTUFFå >å ".getBytes(StandardCharsets.UTF_8)); - runner.run(1, false); - - runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); - Producer<byte[], byte[]> producer = putKafka.getProducer(); - verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); - runner.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java deleted file mode 100644 index 5cee6bc..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -import org.junit.Test; - -public class PublishingContextTest { - - @Test - public void failInvalidConstructorArgs() { - try { - new PublishingContext(null, null); - fail(); - } catch (IllegalArgumentException e) { - // success - } - try { - new PublishingContext(mock(InputStream.class), null); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - new PublishingContext(mock(InputStream.class), ""); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - new PublishingContext(mock(InputStream.class), "mytopic", -3); - fail(); - } catch (IllegalArgumentException e) { - // success - } - } - - @Test - public void validateFullSetting() { - PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3); - publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8)); - publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8)); - - assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8)); - assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8)); - assertEquals("topic", publishingContext.getTopic()); - assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString()); - } - - @Test - public void validateOnlyOnceSetPerInstance() { - PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic"); - publishingContext.setKeyBytes(new byte[] { 0 }); - try { - publishingContext.setKeyBytes(new byte[] { 0 }); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - publishingContext.setDelimiterBytes(new byte[] { 0 }); - try { - publishingContext.setDelimiterBytes(new byte[] { 0 }); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - publishingContext.setMaxRequestSize(1024); - try { - publishingContext.setMaxRequestSize(1024); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - publishingContext.setMaxRequestSize(-10); - fail(); - } catch (IllegalArgumentException e) { - // success - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java deleted file mode 100644 index 0f0b23f..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class StubConsumeKafka extends ConsumeKafka { - - private byte[][] values; - - public void setValues(byte[][] values) { - this.values = values; - } - - - @SuppressWarnings("unchecked") - @Override - protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext context, ProcessSession session) { - Consumer<byte[], byte[]> consumer = super.buildKafkaResource(context, session); - consumer = mock(Consumer.class); - String topicName = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); - - when(consumer.poll(Mockito.anyLong())).thenAnswer(new Answer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer(InvocationOnMock invocation) throws Throwable { - List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(); - for (int i = 0; i < StubConsumeKafka.this.values.length; i++) { - byte[] value = StubConsumeKafka.this.values[i]; - ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(topicName, 0, 0, null, value); - records.add(record); - } - TopicPartition partition = new TopicPartition(topicName, 0); - Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> m = new LinkedHashMap<>(); - m.put(partition, records); - return new ConsumerRecords<>(m); - } - }); - - return consumer; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java deleted file mode 100644 index 3189356..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; -import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class StubPublishKafka extends PublishKafka { - - private volatile Producer<byte[], byte[]> producer; - - private volatile boolean failed; - - private final int ackCheckSize; - - private final ExecutorService executor = Executors.newCachedThreadPool(); - - StubPublishKafka(int ackCheckSize) { - this.ackCheckSize = ackCheckSize; - } - - public Producer<byte[], byte[]> getProducer() { - return producer; - } - - public void destroy() { - this.executor.shutdownNow(); - } - - @SuppressWarnings("unchecked") - @Override - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) - throws ProcessException { - Properties kafkaProperties = this.buildKafkaProperties(context); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - KafkaPublisher publisher; - try { - Field f = PublishKafka.class.getDeclaredField("brokers"); - f.setAccessible(true); - f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); - publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); - publisher.setAckWaitTime(15000); - producer = mock(Producer.class); - this.instrumentProducer(producer, false); - Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); - kf.setAccessible(true); - kf.set(publisher, producer); - - Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog"); - componentLogF.setAccessible(true); - componentLogF.set(publisher, mock(ComponentLog.class)); - - Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize"); - ackCheckSizeField.setAccessible(true); - ackCheckSizeField.set(publisher, this.ackCheckSize); - } catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException(e); - } - return publisher; - } - - @SuppressWarnings("unchecked") - private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) { - - when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { - ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0]; - String value = new String(record.value(), StandardCharsets.UTF_8); - if ("fail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - throw new RuntimeException("intentional"); - } - Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() { - @Override - public RecordMetadata call() throws Exception { - if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - throw new TopicAuthorizationException("Unauthorized"); - } else { - TopicPartition partition = new TopicPartition("foo", 0); - RecordMetadata meta = new RecordMetadata(partition, 0, 0); - return meta; - } - } - }); - return future; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java deleted file mode 100644 index b056a08..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; - -import sun.misc.Unsafe; - -class TestUtils { - - public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(instance, newValue); - } - - - static Unsafe getUnsafe() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - -}
