[
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254585#comment-15254585
]
ASF GitHub Bot commented on NIFI-1296:
--------------------------------------
Github user joewitt commented on a diff in the pull request:
https://github.com/apache/nifi/pull/366#discussion_r60797930
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.MockSessionFactory;
+import org.apache.nifi.util.SharedSessionState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class AbstractKafkaProcessorLifecycelTest {
+
+ @Test
+ public void validateBaseProperties() throws Exception {
+ TestRunner runner =
TestRunners.newTestRunner(DummyProcessor.class);
+ runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
+ runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'bootstrap.servers'
validated against 'foo' is invalid"));
+ }
+ runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
+
+ runner.removeProperty(ConsumeKafka.TOPIC);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'topic' is invalid because
topic is required"));
+ }
+
+ runner.setProperty(ConsumeKafka.TOPIC, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.TOPIC, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+ runner.setProperty(ConsumeKafka.TOPIC, "blah");
+
+ runner.removeProperty(ConsumeKafka.CLIENT_ID);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("invalid because client.id
is required"));
+ }
+
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.CLIENT_ID, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
+
+ runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+ runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one
character that is not white space"));
+ }
+ }
+
+ /*
+ * The goal of this test is to validate the correctness of
+ * AbstractKafkaProcessor's implementation of onTrigger() in a highly
+ * concurrent environment. That is:
+ * - upon processing failures (e.g., unhandled exceptions), the target
Kafka
+ * resource is reset (closed and re-created)
+ * - no FlowFile is unaccounted for. FlowFiles left in the queue and
FlowFiles
+ * in Success relationship = testCount
+ * - failed executions that did not result in the call to close/reset
summed with
+ * verified calls to close should equal total request failed
+ */
+ @Test
+ public void validateLifecycleCorrectnessWithProcessingFailures()
throws Exception {
--- End diff --
I see this fairly often.
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.811 sec
<<< FAILURE! - in
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
validateLifecycleCorrectnessWithProcessingFailures(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)
Time elapsed: 0.694 sec <<< FAILURE!
java.lang.AssertionError: expected:<10000> but was:<9999>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateLifecycleCorrectnessWithProcessingFailures(AbstractKafkaProcessorLifecycelTest.java:221)
> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
> Key: NIFI-1296
> URL: https://issues.apache.org/jira/browse/NIFI-1296
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 0.4.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce
> issues with running against older Kafka brokers (e.g., 0.8). Need to
> investigate.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)