more review comments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4ae563c6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4ae563c6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4ae563c6 Branch: refs/heads/NewKafkaSystemConsumer Commit: 4ae563c609aaaf0d2ceaeb90ab9ae33dfdc8d601 Parents: 5397a34 Author: Boris S <[email protected]> Authored: Mon Sep 24 12:31:25 2018 -0700 Committer: Boris S <[email protected]> Committed: Mon Sep 24 12:31:25 2018 -0700 ---------------------------------------------------------------------- .../samza/config/KafkaConsumerConfig.java | 19 ++--- .../samza/system/kafka/KafkaSystemConsumer.java | 40 +++++---- .../samza/system/kafka/KafkaSystemFactory.scala | 1 - .../samza/config/TestKafkaConsumerConfig.java | 85 ++++++++------------ .../system/kafka/TestKafkaSystemAdminJava.java | 18 ++--- .../system/kafka/TestKafkaSystemConsumer.java | 40 ++++----- 6 files changed, 84 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java index 4bbe00f..7d2408b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java @@ -23,13 +23,11 @@ package org.apache.samza.config; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.samza.SamzaException; -import org.apache.samza.config.JobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -78,7 +76,6 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - // These are values we enforce in sazma, and they cannot be overwritten. // Disable consumer auto-commit because Samza controls commits @@ -86,7 +83,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { // Translate samza config value to kafka config value consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - getAutoOffsetResetValue((String)consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); + getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))); // make sure bootstrap configs are in, if not - get them from the producer if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { @@ -148,9 +145,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { String jobName = config.get(JobConfig.JOB_NAME()); String jobId = (config.get(JobConfig.JOB_ID()) != null) ? config.get(JobConfig.JOB_ID()) : "1"; - return String.format("%s-%s-%s", id.replaceAll( - "\\W", "_"), - jobName.replaceAll("\\W", "_"), + return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"), jobId.replaceAll("\\W", "_")); } @@ -172,7 +167,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { final String KAFKA_OFFSET_NONE = "none"; if (autoOffsetReset == null) { - return KAFKA_OFFSET_LATEST; // return default + return KAFKA_OFFSET_LATEST; // return default } // accept kafka values directly @@ -184,15 +179,15 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { String newAutoOffsetReset; switch (autoOffsetReset) { case SAMZA_OFFSET_LARGEST: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; + newAutoOffsetReset = KAFKA_OFFSET_LATEST; break; case SAMZA_OFFSET_SMALLEST: - newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; + newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; break; default: - newAutoOffsetReset = KAFKA_OFFSET_LATEST; + newAutoOffsetReset = KAFKA_OFFSET_LATEST; } - LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); + LOG.info("AutoOffsetReset value converted from {} to {}", autoOffsetReset, newAutoOffsetReset); return newAutoOffsetReset; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java index e5ded8d..17f29f1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -31,11 +31,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import kafka.common.TopicAndPartition; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.samza.config.KafkaConsumerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemStreamPartition; @@ -69,7 +69,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // BlockeingEnvelopMap's buffers. final private KafkaConsumerProxy proxy; - // keep registration data until the start - mapping between registered SSPs and topicPartitions, and the offsets final Map<TopicPartition, String> topicPartitionsToOffset = new HashMap<>(); final Map<TopicPartition, SystemStreamPartition> topicPartitionsToSSP = new HashMap<>(); @@ -102,7 +101,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // Create the proxy to do the actual message reading. String metricName = String.format("%s %s", systemName, clientId); proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, metrics, metricName); - LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy ); + LOG.info("{}: Created KafkaConsumerProxy {} ", this, proxy); LOG.info("{}: Created KafkaSystemConsumer {}", this, kafkaConsumer); } @@ -117,8 +116,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy public static KafkaConsumer<byte[], byte[]> getKafkaConsumerImpl(String systemName, String clientId, Config config) { // extract kafka client configs - KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); + KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId); LOG.info("{}:{} KafkaClient properties {}", systemName, clientId, consumerConfig); @@ -179,8 +177,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } catch (Exception e) { // all recoverable execptions are handled by the client. // if we get here there is nothing left to do but bail out. - String msg = String.format("%s: Got Exception while seeking to %s for partition %s", - this, startingOffsetString, tp); + String msg = + String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); LOG.error(msg, e); throw new SamzaException(msg, e); } @@ -217,12 +215,12 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } int numTPs = topicPartitionsToSSP.size(); - if (numTPs == topicPartitionsToOffset.size()) { + if (numTPs != topicPartitionsToOffset.size()) { throw new SamzaException("topicPartitionsToSSP.size() doesn't match topicPartitionsToOffset.size()"); } - LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", - this, fetchThresholdBytes, fetchThreshold, numTPs); + LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; partitions num={}", this, fetchThresholdBytes, + fetchThreshold, numTPs); if (numTPs > 0) { perPartitionFetchThreshold = fetchThreshold / numTPs; @@ -231,8 +229,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy // currently this feature cannot be enabled, because we do not have the size of the messages available. // messages get double buffered, hence divide by 2 perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / numTPs; - LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", - this, perPartitionFetchThresholdBytes); + LOG.info("{} :perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes={}", this, + perPartitionFetchThresholdBytes); } } } @@ -268,9 +266,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { if (started.get()) { - String msg = - String.format("%s: Trying to register partition after consumer has been started. ssp=%s", - this, systemStreamPartition); + String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, + systemStreamPartition); throw new SamzaException(msg); } @@ -286,7 +283,6 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy topicPartitionsToSSP.put(tp, systemStreamPartition); - String existingOffset = topicPartitionsToOffset.get(tp); // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. if (existingOffset == null || compareOffsets(existingOffset, offset) > 0) { @@ -353,10 +349,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy } boolean needsMoreMessages(SystemStreamPartition ssp) { - LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" - + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, - getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), - perPartitionFetchThreshold); + LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" + + "(limit={}); messagesNumInQueue={}(limit={};", this, ssp, fetchThresholdBytesEnabled, + getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), + perPartitionFetchThreshold); if (fetchThresholdBytesEnabled) { return getMessagesSizeInQueue(ssp) < perPartitionFetchThresholdBytes; @@ -372,8 +368,8 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy put(ssp, envelope); } catch (InterruptedException e) { throw new SamzaException( - String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", - this, envelope.getOffset(), ssp)); + String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this, + envelope.getOffset(), ssp)); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index deaee56..ba5390b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -22,7 +22,6 @@ package org.apache.samza.system.kafka import java.util.Properties import kafka.utils.ZkUtils -import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java index 719ea22..35a717a 100644 --- a/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java +++ b/samza-kafka/src/test/java/org/apache/samza/config/TestKafkaConsumerConfig.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.samza.SamzaException; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; @@ -35,84 +34,67 @@ public class TestKafkaConsumerConfig { public final static String KAFKA_CONSUMER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".consumer."; private final static String CLIENT_ID = "clientId"; - @Before - public void setProps() { - - } - @Test public void testDefaults() { props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // should be ignored - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "Ignore"); // should be ignored - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); // should NOT be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + "Ignore"); // should be ignored + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + "100"); // should NOT be ignored // if KAFKA_CONSUMER_PROPERTY_PREFIX is set, then PRODUCER should be ignored props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "ignroeThis:9092"); props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + "bootstrap.servers", "useThis:9092"); Config config = new MapConfig(props); - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - config, SYSTEM_NAME, CLIENT_ID); + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); Assert.assertEquals("false", kafkaConsumerConfig.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); - Assert.assertEquals( - KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS, + Assert.assertEquals(KafkaConsumerConfig.DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS, kafkaConsumerConfig.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); - Assert.assertEquals( - RangeAssignor.class.getName(), + Assert.assertEquals(RangeAssignor.class.getName(), kafkaConsumerConfig.get(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); - Assert.assertEquals( - "useThis:9092", - kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - Assert.assertEquals( - "100", - kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assert.assertEquals("100", kafkaConsumerConfig.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - Assert.assertEquals( - ByteArrayDeserializer.class.getName(), + Assert.assertEquals(ByteArrayDeserializer.class.getName(), kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); - Assert.assertEquals( - ByteArrayDeserializer.class.getName(), - kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) ); + Assert.assertEquals(ByteArrayDeserializer.class.getName(), + kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); - Assert.assertEquals( - CLIENT_ID, - kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); + Assert.assertEquals(CLIENT_ID, kafkaConsumerConfig.get(ConsumerConfig.CLIENT_ID_CONFIG)); - Assert.assertEquals( - KafkaConsumerConfig.getConsumerGroupId(config), + Assert.assertEquals(KafkaConsumerConfig.getConsumerGroupId(config), kafkaConsumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG)); } - @Test // test stuff that should not be overridden + @Test public void testNotOverride() { // if KAFKA_CONSUMER_PROPERTY_PREFIX is not set, then PRODUCER should be used props.put(KAFKA_PRODUCER_PROPERTY_PREFIX + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "useThis:9092"); - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); - props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestKafkaConsumerConfig.class.getName()); - + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + TestKafkaConsumerConfig.class.getName()); + props.put(KAFKA_CONSUMER_PROPERTY_PREFIX + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + TestKafkaConsumerConfig.class.getName()); Config config = new MapConfig(props); - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - config, SYSTEM_NAME, CLIENT_ID); + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, SYSTEM_NAME, CLIENT_ID); - Assert.assertEquals( - "useThis:9092", - kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Assert.assertEquals("useThis:9092", kafkaConsumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - Assert.assertEquals( - TestKafkaConsumerConfig.class.getName(), + Assert.assertEquals(TestKafkaConsumerConfig.class.getName(), kafkaConsumerConfig.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); - Assert.assertEquals( - TestKafkaConsumerConfig.class.getName(), + Assert.assertEquals(TestKafkaConsumerConfig.class.getName(), kafkaConsumerConfig.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); } @@ -122,30 +104,29 @@ public class TestKafkaConsumerConfig { map.put(JobConfig.JOB_NAME(), "jobName"); map.put(JobConfig.JOB_ID(), "jobId"); - String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + String result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-jobName-jobId", result); - result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map)); + result = KafkaConsumerConfig.getConsumerClientId("consumer-", new MapConfig(map)); Assert.assertEquals("consumer_-jobName-jobId", result); - result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map)); + result = KafkaConsumerConfig.getConsumerClientId("super-duper-consumer", new MapConfig(map)); Assert.assertEquals("super_duper_consumer-jobName-jobId", result); map.put(JobConfig.JOB_NAME(), " very important!job"); - result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-_very_important_job-jobId", result); map.put(JobConfig.JOB_ID(), "number-#3"); - result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); + result = KafkaConsumerConfig.getConsumerClientId("consumer", new MapConfig(map)); Assert.assertEquals("consumer-_very_important_job-number__3", result); } - - @Test(expected = SamzaException.class) public void testNoBootstrapServers() { - KafkaConsumerConfig kafkaConsumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig( - new MapConfig(Collections.emptyMap()), SYSTEM_NAME, "clientId"); + KafkaConsumerConfig kafkaConsumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(new MapConfig(Collections.emptyMap()), SYSTEM_NAME, + "clientId"); Assert.fail("didn't get exception for the missing config:" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); } http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index 77f47f9..7e968bf 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,17 +19,14 @@ package org.apache.samza.system.kafka; -import java.util.*; import java.util.HashMap; import java.util.Map; - +import java.util.Properties; import kafka.api.TopicMetadata; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.util.ScalaJavaUtil; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -71,7 +68,6 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { admin.createStream(spec); admin.validateStream(spec); - } @Test @@ -143,7 +139,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { public void testCreateStream() { StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec)); systemAdmin().validateStream(spec); assertFalse("createStream should return false if the stream already exists.", systemAdmin().createStream(spec)); @@ -162,7 +159,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8); StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec1)); systemAdmin().validateStream(spec2); } @@ -172,7 +170,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8); StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec1)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec1)); systemAdmin().validateStream(spec2); } @@ -181,7 +180,8 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { public void testClearStream() { StreamSpec spec = new StreamSpec("testId", "testStreamClear", "testSystem", 8); - assertTrue("createStream should return true if the stream does not exist and then is created.", systemAdmin().createStream(spec)); + assertTrue("createStream should return true if the stream does not exist and then is created.", + systemAdmin().createStream(spec)); assertTrue(systemAdmin().clearStream(spec)); scala.collection.immutable.Set<String> topic = new scala.collection.immutable.Set.Set1<>(spec.getPhysicalName()); http://git-wip-us.apache.org/repos/asf/samza/blob/4ae563c6/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java index 9e8ff44..933558c 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java @@ -21,24 +21,22 @@ package org.apache.samza.system.kafka; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.samza.config.KafkaConsumerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.Clock; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; @@ -52,12 +50,7 @@ public class TestKafkaSystemConsumer { public final String FETCH_THRESHOLD_MSGS = "50000"; public final String FETCH_THRESHOLD_BYTES = "100000"; - @Before - public void setUp() { - - } - - private KafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) { + private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) { final Map<String, String> map = new HashMap<>(); map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); @@ -70,8 +63,8 @@ public class TestKafkaSystemConsumer { KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID); final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig); - MockKafkaSystmeCosumer newKafkaSystemConsumer = - new MockKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, + MockKafkaSystemConsumer newKafkaSystemConsumer = + new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); return newKafkaSystemConsumer; @@ -80,7 +73,7 @@ public class TestKafkaSystemConsumer { @Test public void testConfigValidations() { - final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); consumer.start(); // should be no failures @@ -88,7 +81,7 @@ public class TestKafkaSystemConsumer { @Test public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { - final KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); final int partitionsNum = 50; for (int i = 0; i < partitionsNum; i++) { consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); @@ -99,12 +92,14 @@ public class TestKafkaSystemConsumer { Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, consumer.perPartitionFetchThresholdBytes); + + consumer.stop(); } @Test public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { - KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); @@ -137,7 +132,7 @@ public class TestKafkaSystemConsumer { bytesSerde.serialize("", "value1".getBytes()), ime1Size); IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), bytesSerde.serialize("", "value11".getBytes()), ime11Size); - KafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); consumer.register(ssp0, "0"); consumer.register(ssp1, "0"); @@ -156,6 +151,8 @@ public class TestKafkaSystemConsumer { Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); } @Test @@ -178,7 +175,7 @@ public class TestKafkaSystemConsumer { // limit by number of messages 4/2 = 2 per partition // limit by number of bytes - disabled - KafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable + KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable consumer.register(ssp0, "0"); consumer.register(ssp1, "0"); @@ -197,6 +194,8 @@ public class TestKafkaSystemConsumer { Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); } // mock kafkaConsumer and SystemConsumer @@ -206,17 +205,12 @@ public class TestKafkaSystemConsumer { } } - static class MockKafkaSystmeCosumer extends KafkaSystemConsumer { - public MockKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, + static class MockKafkaSystemConsumer extends KafkaSystemConsumer { + public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { super(kafkaConsumer, systemName, config, clientId, metrics, clock); } - //@Override - //void createConsumerProxy() { - // this.messageSink = new KafkaConsumerMessageSink(); - //} - @Override void startConsumer() { }
