Repository: apex-malhar Updated Branches: refs/heads/master 6ddefd02a -> c89e63621
fixes two issues - pick the metadata consumer properties and setting the consumer properties from Properties.xml for kafka inputoperator not working added test case Removed trailing spaces Removed unused imports removed static from getPropertyAsString implementation in AbstractKafkaPartitioner Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c89e6362 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c89e6362 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c89e6362 Branch: refs/heads/master Commit: c89e636217a3416908fce7d57503f937cbdea2e9 Parents: 6ddefd0 Author: venkateshDT <[email protected]> Authored: Thu Sep 22 13:27:32 2016 -0700 Committer: venkateshDT <[email protected]> Committed: Mon Oct 24 13:48:59 2016 -0700 ---------------------------------------------------------------------- .../kafka/AbstractKafkaInputOperator.java | 2 +- .../malhar/kafka/AbstractKafkaPartitioner.java | 23 +++++- .../kafka/KafkaConsumerPropertiesTest.java | 79 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 4cf2888..6fc7693 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -131,7 +131,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera private int holdingBufferSize = 1024; - private Properties consumerProps; + private Properties consumerProps = new Properties(); /** * Assignment for each operator instance http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index ad5c3fa..c9b40be 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -18,6 +18,8 @@ */ package org.apache.apex.malhar.kafka; +import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -222,17 +224,36 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa metadataRefreshClients = new ArrayList<>(clusters.length); int index = 0; for (String c : clusters) { - Properties prop = new Properties(); + Properties prop = prototypeOperator.getConsumerProps(); prop.put("group.id", META_CONSUMER_GROUP_NAME); prop.put("bootstrap.servers", c); prop.put("key.deserializer", ByteArrayDeserializer.class.getName()); prop.put("value.deserializer", ByteArrayDeserializer.class.getName()); prop.put("enable.auto.commit", "false"); + if (logger.isInfoEnabled()) { + logger.info("Consumer Properties : {} ", getPropertyAsString(prop)); + } metadataRefreshClients.add(index++, new KafkaConsumer<byte[], byte[]>(prop)); } } /** + * Converts the property list (key and element pairs) to String format + * This format is used to print to a Stream for debugging. + * @param prop + * @return String + */ + private String getPropertyAsString(Properties prop) { + StringWriter writer = new StringWriter(); + try { + prop.store(writer, ""); + } catch (IOException e) { + logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() ); + } + return writer.getBuffer().toString(); + } + + /** * The key object used in the assignment map for each operator */ public static class PartitionMeta http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java new file mode 100644 index 0000000..83e0de6 --- /dev/null +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.kafka; + +import java.text.ParseException; +import java.util.Properties; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.kafka.common.KafkaException; + +import com.datatorrent.api.Context; + +public class KafkaConsumerPropertiesTest +{ + + KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator(); + @Rule + public Watcher watcher = new Watcher(); + + public class Watcher extends TestWatcher + { + Context.OperatorContext context; + @Override + protected void starting(Description description) + { + super.starting(description); + kafkaInput.setClusters("localhost:8087"); + kafkaInput.setInitialPartitionCount(1); + kafkaInput.setTopics("apexTest"); + kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); + Properties prop = new Properties(); + prop.setProperty("security.protocol","SASL_PLAINTEXT"); + prop.setProperty("sasl.kerberos.service.name","kafka"); + kafkaInput.setConsumerProps(prop); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + } + } + + @Test + public void TestConsumerProperties() throws ParseException + { + //Added test on this check to ensure consumer properties are set and not reset between. + if (null != kafkaInput.getConsumerProps().get("security.protocol")) { + try { + kafkaInput.definePartitions(null, null); + } catch (KafkaException e) { + //Ensures the properties of the consumer are set/not reset. + Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " + + "secure mode.", e.getCause().getMessage()); + } + } + } +}
