[ 
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254585#comment-15254585
 ] 

ASF GitHub Bot commented on NIFI-1296:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60797930
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
 ---
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.Closeable;
    +import java.lang.reflect.Field;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.MockProcessSession;
    +import org.apache.nifi.util.MockSessionFactory;
    +import org.apache.nifi.util.SharedSessionState;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Test;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +public class AbstractKafkaProcessorLifecycelTest {
    +
    +    @Test
    +    public void validateBaseProperties() throws Exception {
    +        TestRunner runner = 
TestRunners.newTestRunner(DummyProcessor.class);
    +        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
    +        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
    +
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'bootstrap.servers' 
validated against 'foo' is invalid"));
    +        }
    +        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
    +
    +        runner.removeProperty(ConsumeKafka.TOPIC);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("'topic' is invalid because 
topic is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.TOPIC, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.TOPIC, "blah");
    +
    +        runner.removeProperty(ConsumeKafka.CLIENT_ID);
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("invalid because client.id 
is required"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
    +
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
    +        try {
    +            runner.assertValid();
    +            fail();
    +        } catch (AssertionError e) {
    +            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
    +        }
    +    }
    +
    +    /*
    +     * The goal of this test is to validate the correctness of
    +     * AbstractKafkaProcessor's implementation of onTrigger() in a highly
    +     * concurrent environment. That is:
    +     * - upon processing failures (e.g., unhandled exceptions), the target 
Kafka
    +     *   resource is reset (closed and re-created)
    +     * - no FlowFile is unaccounted for. FlowFiles left in the queue and 
FlowFiles
    +     *   in Success relationship = testCount
    +     * - failed executions that did not result in the call to close/reset 
summed with
    +     *   verified calls to close should equal total request failed
    +     */
    +    @Test
    +    public void validateLifecycleCorrectnessWithProcessingFailures() 
throws Exception {
    --- End diff --
    
    I see this fairly often.
    
    
    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running 
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.811 sec 
<<< FAILURE! - in 
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest
    
validateLifecycleCorrectnessWithProcessingFailures(org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest)
  Time elapsed: 0.694 sec  <<< FAILURE!
    java.lang.AssertionError: expected:<10000> but was:<9999>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:645)
        at org.junit.Assert.assertEquals(Assert.java:631)
        at 
org.apache.nifi.processors.kafka.pubsub.AbstractKafkaProcessorLifecycelTest.validateLifecycleCorrectnessWithProcessingFailures(AbstractKafkaProcessorLifecycelTest.java:221)



> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
>                 Key: NIFI-1296
>                 URL: https://issues.apache.org/jira/browse/NIFI-1296
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.4.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in 
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce 
> issues with running against older Kafka brokers (e.g., 0.8). Need to 
> investigate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to