Repository: samza Updated Branches: refs/heads/master 3c78e06ac -> 63d33fa06
http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java new file mode 100644 index 0000000..6a03198 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import kafka.admin.AdminClient; +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.samza.system.kafka.KafkaSystemDescriptor.*; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +public class TestKafkaSystemAdminWithMock { + private KafkaSystemAdmin kafkaSystemAdmin; + //private KafkaSystemAdmin kafkaAdmin; + private Config testConfig; + private Consumer<byte[], byte[]> mockKafkaConsumer; + private PartitionInfo mockPartitionInfo0; + private PartitionInfo mockPartitionInfo1; + private TopicPartition testTopicPartition0; + private TopicPartition testTopicPartition1; + + private ConcurrentHashMap<String, KafkaSystemConsumer> consumersReference; + + private static final String VALID_TOPIC = "validTopic"; + private static final String INVALID_TOPIC = "invalidTopic"; + private static final String TEST_SYSTEM = "testSystem"; + private static final Long KAFKA_BEGINNING_OFFSET_FOR_PARTITION0 = 10L; + private static final Long KAFKA_BEGINNING_OFFSET_FOR_PARTITION1 = 11L; + private static final Long KAFKA_END_OFFSET_FOR_PARTITION0 = 20L; + private static final Long KAFKA_END_OFFSET_FOR_PARTITION1 = 21L; + + @Before + public void setUp() throws Exception { + Map<String, String> configMap = new HashMap<>(); + configMap.put(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + "localhost:123"); + configMap.put(String.format(KafkaConfig.CONSUMER_ZK_CONNECT_CONFIG_KEY(), TEST_SYSTEM), "localhost:124"); + configMap.put(JobConfig.JOB_NAME(), "jobName"); + configMap.put(JobConfig.JOB_ID(), "jobId"); + + testConfig = new MapConfig(configMap); + + consumersReference = new ConcurrentHashMap<>(); + + // mock PartitionInfo + mockPartitionInfo0 = mock(PartitionInfo.class); + when(mockPartitionInfo0.topic()).thenReturn(VALID_TOPIC); + when(mockPartitionInfo0.partition()).thenReturn(0); + mockPartitionInfo1 = mock(PartitionInfo.class); + when(mockPartitionInfo1.topic()).thenReturn(VALID_TOPIC); + when(mockPartitionInfo1.partition()).thenReturn(1); + + // mock LinkedInKafkaConsumerImpl constructor + mockKafkaConsumer = mock(KafkaConsumer.class); + + // mock LinkedInKafkaConsumerImpl other behaviors + testTopicPartition0 = new TopicPartition(VALID_TOPIC, 0); + testTopicPartition1 = new TopicPartition(VALID_TOPIC, 1); + Map<TopicPartition, Long> testBeginningOffsets = + ImmutableMap.of(testTopicPartition0, KAFKA_BEGINNING_OFFSET_FOR_PARTITION0, testTopicPartition1, + KAFKA_BEGINNING_OFFSET_FOR_PARTITION1); + Map<TopicPartition, Long> testEndOffsets = + ImmutableMap.of(testTopicPartition0, KAFKA_END_OFFSET_FOR_PARTITION0, testTopicPartition1, + KAFKA_END_OFFSET_FOR_PARTITION1); + + when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenReturn( + ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1)); + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( + testBeginningOffsets); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( + testEndOffsets); + + kafkaSystemAdmin = + new KafkaSystemAdmin(TEST_SYSTEM, testConfig, mockKafkaConsumer); + + } + + @After + public void tearDown() { + } + + @Test + public void testGetSystemStreamMetaDataWithValidTopic() { + System.out.println("STARTING"); + Map<String, SystemStreamMetadata> metadataMap = + kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC)); + + // verify metadata size + assertEquals("metadata should return for 1 topic", metadataMap.size(), 1); + System.out.println("STARTING1"); + // verify the metadata streamName + assertEquals("the stream name should be " + VALID_TOPIC, metadataMap.get(VALID_TOPIC).getStreamName(), VALID_TOPIC); + System.out.println("STARTING2"); + // verify the offset for each partition + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> systemStreamPartitionMetadata = + metadataMap.get(VALID_TOPIC).getSystemStreamPartitionMetadata(); + assertEquals("there are 2 partitions", systemStreamPartitionMetadata.size(), 2); + System.out.println("STARTING3"); + SystemStreamMetadata.SystemStreamPartitionMetadata partition0Metadata = + systemStreamPartitionMetadata.get(new Partition(0)); + assertEquals("oldest offset for partition 0", partition0Metadata.getOldestOffset(), + KAFKA_BEGINNING_OFFSET_FOR_PARTITION0.toString()); + assertEquals("upcoming offset for partition 0", partition0Metadata.getUpcomingOffset(), + KAFKA_END_OFFSET_FOR_PARTITION0.toString()); + assertEquals("newest offset for partition 0", partition0Metadata.getNewestOffset(), + Long.toString(KAFKA_END_OFFSET_FOR_PARTITION0 - 1)); + System.out.println("STARTING4"); + SystemStreamMetadata.SystemStreamPartitionMetadata partition1Metadata = + systemStreamPartitionMetadata.get(new Partition(1)); + assertEquals("oldest offset for partition 1", partition1Metadata.getOldestOffset(), + KAFKA_BEGINNING_OFFSET_FOR_PARTITION1.toString()); + assertEquals("upcoming offset for partition 1", partition1Metadata.getUpcomingOffset(), + KAFKA_END_OFFSET_FOR_PARTITION1.toString()); + assertEquals("newest offset for partition 1", partition1Metadata.getNewestOffset(), + Long.toString(KAFKA_END_OFFSET_FOR_PARTITION1 - 1)); + } + + @Test + public void testGetSystemStreamMetaDataWithInvalidTopic() { + Map<String, SystemStreamMetadata> metadataMap = + kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(INVALID_TOPIC)); + assertEquals("empty metadata for invalid topic", metadataMap.size(), 0); + } + + @Test + public void testGetSystemStreamMetaDataWithNoTopic() { + Map<String, SystemStreamMetadata> metadataMap = kafkaSystemAdmin.getSystemStreamMetadata(Collections.emptySet()); + assertEquals("empty metadata for no topic", metadataMap.size(), 0); + } + + @Test + public void testGetSystemStreamMetaDataForTopicWithNoMessage() { + // The topic with no messages will have beginningOffset = 0 and endOffset = 0 + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( + ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L)); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( + ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L)); + + Map<String, SystemStreamMetadata> metadataMap = + kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC)); + assertEquals("metadata should return for 1 topic", metadataMap.size(), 1); + + // verify the metadata streamName + assertEquals("the stream name should be " + VALID_TOPIC, metadataMap.get(VALID_TOPIC).getStreamName(), VALID_TOPIC); + + // verify the offset for each partition + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> systemStreamPartitionMetadata = + metadataMap.get(VALID_TOPIC).getSystemStreamPartitionMetadata(); + assertEquals("there are 2 partitions", systemStreamPartitionMetadata.size(), 2); + + SystemStreamMetadata.SystemStreamPartitionMetadata partition0Metadata = + systemStreamPartitionMetadata.get(new Partition(0)); + assertEquals("oldest offset for partition 0", partition0Metadata.getOldestOffset(), "0"); + assertEquals("upcoming offset for partition 0", partition0Metadata.getUpcomingOffset(), "0"); + assertEquals("newest offset is not set due to abnormal upcoming offset", partition0Metadata.getNewestOffset(), + null); + + SystemStreamMetadata.SystemStreamPartitionMetadata partition1Metadata = + systemStreamPartitionMetadata.get(new Partition(1)); + assertEquals("oldest offset for partition 1", partition1Metadata.getOldestOffset(), "0"); + assertEquals("upcoming offset for partition 1", partition1Metadata.getUpcomingOffset(), "0"); + assertEquals("newest offset is not set due to abnormal upcoming offset", partition1Metadata.getNewestOffset(), + null); + } + + @Test + public void testGetSSPMetadata() { + SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0)); + SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, "otherTopic", new Partition(1)); + TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0); + TopicPartition otherTopicPartition = new TopicPartition("otherTopic", 1); + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition, otherTopicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 1L, otherTopicPartition, 2L)); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition, otherTopicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 11L, otherTopicPartition, 12L)); + Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> expected = + ImmutableMap.of(ssp, new SystemStreamMetadata.SystemStreamPartitionMetadata("1", "10", "11"), otherSSP, + new SystemStreamMetadata.SystemStreamPartitionMetadata("2", "11", "12")); + assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp, otherSSP)), expected); + } + + @Test + public void testGetSSPMetadataEmptyPartition() { + SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0)); + SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, "otherTopic", new Partition(1)); + TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0); + TopicPartition otherTopicPartition = new TopicPartition("otherTopic", 1); + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition, otherTopicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 1L)); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition, otherTopicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 11L)); + + Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> expected = + ImmutableMap.of(ssp, new SystemStreamMetadata.SystemStreamPartitionMetadata("1", "10", "11"), otherSSP, + new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null)); + assertEquals(expected, kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp, otherSSP))); + } + + @Test + public void testGetSSPMetadataEmptyUpcomingOffset() { + SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0)); + TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0); + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 0L)); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition))).thenReturn(ImmutableMap.of()); + Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> expected = + ImmutableMap.of(ssp, new SystemStreamMetadata.SystemStreamPartitionMetadata("0", null, null)); + assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp)), expected); + } + + @Test + public void testGetSSPMetadataZeroUpcomingOffset() { + SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, VALID_TOPIC, new Partition(0)); + TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0); + when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition))).thenReturn( + ImmutableMap.of(topicPartition, -1L)); + when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition))).thenReturn( + ImmutableMap.of(topicPartition, 0L)); + Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> expected = + ImmutableMap.of(ssp, new SystemStreamMetadata.SystemStreamPartitionMetadata("0", null, "0")); + assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp)), expected); + } + + @Test + public void testGetSystemStreamMetaDataWithRetry() { + final List<PartitionInfo> partitionInfosForTopic = ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1); + when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException()) + .thenReturn(partitionInfosForTopic); + + Map<String, SystemStreamMetadata> metadataMap = + kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC)); + assertEquals("metadata should return for 1 topic", metadataMap.size(), 1); + + // retried twice because the first fails and the second succeeds + Mockito.verify(mockKafkaConsumer, Mockito.times(2)).partitionsFor(VALID_TOPIC); + + final List<TopicPartition> topicPartitions = + Arrays.asList(new TopicPartition(mockPartitionInfo0.topic(), mockPartitionInfo0.partition()), + new TopicPartition(mockPartitionInfo1.topic(), mockPartitionInfo1.partition())); + // the following methods thereafter are only called once + Mockito.verify(mockKafkaConsumer, Mockito.times(1)).beginningOffsets(topicPartitions); + Mockito.verify(mockKafkaConsumer, Mockito.times(1)).endOffsets(topicPartitions); + } + + @Test(expected = SamzaException.class) + public void testGetSystemStreamMetadataShouldTerminateAfterFiniteRetriesOnException() { + when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()); + + kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC)); + } + + @Test(expected = SamzaException.class) + public void testGetSystemStreamPartitionCountsShouldTerminateAfterFiniteRetriesOnException() throws Exception { + final Set<String> streamNames = ImmutableSet.of(VALID_TOPIC); + final long cacheTTL = 100L; + + when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()) + .thenThrow(new RuntimeException()); + + kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java new file mode 100644 index 0000000..981ac45 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.KafkaConsumerConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestKafkaSystemConsumer { + public final String TEST_SYSTEM = "test-system"; + public final String TEST_STREAM = "test-stream"; + public final String TEST_JOB = "test-job"; + public final String TEST_PREFIX_ID = "testClientId"; + public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; + public final String FETCH_THRESHOLD_MSGS = "50000"; + public final String FETCH_THRESHOLD_BYTES = "100000"; + + private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) { + final Map<String, String> map = new HashMap<>(); + + map.put(JobConfig.JOB_NAME(), TEST_JOB); + + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); + map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + BOOTSTRAP_SERVER); + map.put(JobConfig.JOB_NAME(), "jobName"); + + Config config = new MapConfig(map); + String clientId = KafkaConsumerConfig.createClientId(TEST_PREFIX_ID, config); + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, clientId); + + final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig); + + MockKafkaSystemConsumer newKafkaSystemConsumer = + new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_PREFIX_ID, + new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); + + return newKafkaSystemConsumer; + } + + @Test + public void testConfigValidations() { + + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.start(); + // should be no failures + } + + @Test + public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { + final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final int partitionsNum = 50; + for (int i = 0; i < partitionsNum; i++) { + consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); + } + + consumer.start(); + + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, + consumer.perPartitionFetchThresholdBytes); + + consumer.stop(); + } + + @Test + public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { + + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); + + consumer.register(ssp0, "0"); + consumer.register(ssp0, "5"); + consumer.register(ssp1, "2"); + consumer.register(ssp1, "3"); + consumer.register(ssp2, "0"); + + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); + assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); + assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); + } + + @Test + public void testFetchThresholdBytes() { + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size + int ime11Size = 20; + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // queue for ssp1 should full now, because we added message of size 20 on top + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); + } + + @Test + public void testFetchThresholdBytesDiabled() { + // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit + int ime11Size = 20;// event with the second message still below the size limit + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + + // limit by number of messages 4/2 = 2 per partition + // limit by number of bytes - disabled + KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // should be full by size, but not full by number of messages (1 of 2) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // not full neither by size nor by messages + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // not full by size, but should be full by messages + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + + consumer.stop(); + } + + // mock kafkaConsumer and SystemConsumer + static class MockKafkaConsumer extends KafkaConsumer { + public MockKafkaConsumer(Map<String, Object> configs) { + super(configs); + } + } + + static class MockKafkaSystemConsumer extends KafkaSystemConsumer { + public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + super(kafkaConsumer, systemName, config, clientId, metrics, clock); + } + + @Override + void startConsumer() { + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java new file mode 100644 index 0000000..03b0564 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.Map; +import kafka.common.TopicAndPartition; +import org.apache.samza.metrics.Metric; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.ReadableMetricsRegistry; +import org.junit.Assert; +import org.junit.Test; + + +public class TestKafkaSystemConsumerMetrics { + @Test + public void testKafkaSystemConsumerMetrics() { + String systemName = "system"; + TopicAndPartition tp1 = new TopicAndPartition("topic1", 1); + TopicAndPartition tp2 = new TopicAndPartition("topic2", 2); + String clientName = "clientName"; + + // record expected values for further comparison + Map<String, String> expectedValues = new HashMap<>(); + + ReadableMetricsRegistry registry = new MetricsRegistryMap(); + KafkaSystemConsumerMetrics metrics = new KafkaSystemConsumerMetrics(systemName, registry); + + // initialize the metrics for the partitions + metrics.registerTopicAndPartition(tp1); + metrics.registerTopicAndPartition(tp2); + + // initialize the metrics for the host:port + metrics.registerClientProxy(clientName); + + metrics.setOffsets(tp1, 1001); + metrics.setOffsets(tp2, 1002); + expectedValues.put(metrics.offsets().get(tp1).getName(), "1001"); + expectedValues.put(metrics.offsets().get(tp2).getName(), "1002"); + + metrics.incBytesReads(tp1, 10); + metrics.incBytesReads(tp1, 5); // total 15 + expectedValues.put(metrics.bytesRead().get(tp1).getName(), "15"); + + metrics.incReads(tp1); + metrics.incReads(tp1); // total 2 + expectedValues.put(metrics.reads().get(tp1).getName(), "2"); + + metrics.setHighWatermarkValue(tp2, 1000); + metrics.setHighWatermarkValue(tp2, 1001); // final value 1001 + expectedValues.put(metrics.highWatermark().get(tp2).getName(), "1001"); + + metrics.setLagValue(tp1, 200); + metrics.setLagValue(tp1, 201); // final value 201 + expectedValues.put(metrics.lag().get(tp1).getName(), "201"); + + metrics.incClientBytesReads(clientName, 100); // broker-bytes-read + metrics.incClientBytesReads(clientName, 110); // total 210 + expectedValues.put(metrics.clientBytesRead().get(clientName).getName(), "210"); + + metrics.incClientReads(clientName); // messages-read + metrics.incClientReads(clientName); // total 2 + expectedValues.put(metrics.clientReads().get(clientName).getName(), "2"); + + metrics.setNumTopicPartitions(clientName, 2); // "topic-partitions" + metrics.setNumTopicPartitions(clientName, 3); // final value 3 + expectedValues.put(metrics.topicPartitions().get(clientName).getName(), "3"); + + + String groupName = metrics.group(); + Assert.assertEquals(groupName, KafkaSystemConsumerMetrics.class.getName()); + Assert.assertEquals(metrics.systemName(), systemName); + + Map<String, Metric> metricMap = registry.getGroup(groupName); + validate(metricMap, expectedValues); + } + + protected static void validate(Map<String, Metric> metricMap, Map<String, String> expectedValues) { + // match the expected value, set in the test above, and the value in the metrics + for(Map.Entry<String, String> e: expectedValues.entrySet()) { + String metricName = e.getKey(); + String expectedValue = e.getValue(); + // get the metric from the registry + String actualValue = metricMap.get(metricName).toString(); + + //System.out.println("name=" + metricName + " expVal=" + expectedValue + " actVal=" + actualValue); + Assert.assertEquals("failed for metricName=" + metricName, actualValue, expectedValue); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index cd511f2..1570363 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -21,22 +21,18 @@ package org.apache.samza.system.kafka -import java.util.{Properties, UUID} - import kafka.admin.AdminUtils -import org.apache.kafka.common.errors.LeaderNotAvailableException -import org.apache.kafka.common.protocol.Errors import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, ZkUtils} +import kafka.utils.TestUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.security.JaasUtils import org.apache.samza.Partition -import org.apache.samza.config.KafkaProducerConfig +import org.apache.samza.config._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition} -import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore} +import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore} import org.junit.Assert._ import org.junit._ @@ -53,6 +49,9 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { val TOTAL_PARTITIONS = 50 val REPLICATION_FACTOR = 2 val zkSecure = JaasUtils.isZkSecurityEnabled() + val KAFKA_CONSUMER_PROPERTY_PREFIX: String = "systems." + SYSTEM + ".consumer." + val KAFKA_PRODUCER_PROPERTY_PREFIX: String = "systems." + SYSTEM + ".producer." + protected def numBrokers: Int = 3 @@ -69,14 +68,19 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { @BeforeClass override def setUp() { super.setUp() - val config = new java.util.HashMap[String, String]() - config.put("bootstrap.servers", brokerList) - config.put("acks", "all") - config.put("serializer.class", "kafka.serializer.StringEncoder") - producerConfig = new KafkaProducerConfig("kafka", "i001", config) + val map = new java.util.HashMap[String, String]() + map.put("bootstrap.servers", brokerList) + map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + map.put("acks", "all") + map.put("serializer.class", "kafka.serializer.StringEncoder") + + + producerConfig = new KafkaProducerConfig("kafka", "i001", map) producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name") - systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) + systemAdmin = createSystemAdmin(SYSTEM, map) + systemAdmin.start() } @@ -122,7 +126,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { } def getConsumerConnector(): ConsumerConnector = { - val props = new Properties + val props = new java.util.Properties props.put("zookeeper.connect", zkConnect) props.put("group.id", "test") @@ -132,18 +136,36 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { Consumer.create(consumerConfig) } - def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = { - new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, - coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false) - } + def createSystemAdmin(system: String, map: java.util.Map[String, String]) = { + // required configs - boostraplist, zkconnect and jobname + map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + brokerList) + map.put(KAFKA_PRODUCER_PROPERTY_PREFIX + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + brokerList) + map.put(JobConfig.JOB_NAME, "job.name") + map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + KafkaConsumerConfig.ZOOKEEPER_CONNECT, zkConnect) + + val config: Config = new MapConfig(map) + val res = KafkaSystemAdminUtilsScala.getIntermediateStreamProperties(config) + + val clientId = KafkaConsumerConfig.createClientId("clientPrefix", config); + // extract kafka client configs + val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, system, clientId) + + new KafkaSystemAdmin( + system, + config, + KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig)) + } } /** - * Test creates a local ZK and Kafka cluster, and uses it to create and test - * topics for to verify that offset APIs in SystemAdmin work as expected. - */ + * Test creates a local ZK and Kafka cluster, and uses it to create and test + * topics for to verify that offset APIs in SystemAdmin work as expected. + */ class TestKafkaSystemAdmin { + import TestKafkaSystemAdmin._ @Test @@ -163,7 +185,7 @@ class TestKafkaSystemAdmin { new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2", new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3", new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4") - val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) + val metadata = KafkaSystemAdminUtilsScala.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets) assertNotNull(metadata) assertEquals(2, metadata.size) assertTrue(metadata.contains("stream1")) @@ -271,7 +293,9 @@ class TestKafkaSystemAdmin { @Test def testShouldCreateCoordinatorStream { val topic = "test-coordinator-stream" - val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) + val map = new java.util.HashMap[String, String]() + map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "3") + val systemAdmin = createSystemAdmin(SYSTEM, map) val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") systemAdmin.createStream(spec) @@ -284,30 +308,6 @@ class TestKafkaSystemAdmin { assertEquals(3, partitionMetadata.replicas.size) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) { - import kafka.api.TopicMetadata - var metadataCallCount = 0 - - // Simulate Kafka telling us that the leader for the topic is not available - override def getTopicMetadata(topics: Set[String]) = { - metadataCallCount += 1 - val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE) - Map("quux" -> topicMetadata) - } - } - - @Test - def testShouldRetryOnTopicMetadataError { - val systemAdmin = new KafkaSystemAdminWithTopicMetadataError - val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3) - try { - systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff) - fail("expected CallLimitReached to be thrown") - } catch { - case e: ExponentialSleepStrategy.CallLimitReached => () - } - } - @Test def testGetNewestOffset { createTopic(TOPIC2, 16) @@ -335,17 +335,4 @@ class TestKafkaSystemAdmin { assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0)) assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0)) } - - @Test (expected = classOf[LeaderNotAvailableException]) - def testGetNewestOffsetMaxRetry { - val expectedRetryCount = 3 - val systemAdmin = new KafkaSystemAdminWithTopicMetadataError - try { - systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3) - } catch { - case e: Exception => - assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount) - throw e - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java deleted file mode 100644 index 5791545..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka; - -import java.util.HashMap; -import java.util.Map; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.config.KafkaConfig; -import org.apache.samza.config.KafkaConsumerConfig; -import org.apache.samza.config.MapConfig; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.Clock; -import org.apache.samza.util.NoOpMetricsRegistry; -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.*; - - -public class TestKafkaSystemConsumer { - public final String TEST_SYSTEM = "test-system"; - public final String TEST_STREAM = "test-stream"; - public final String TEST_CLIENT_ID = "testClientId"; - public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; - public final String FETCH_THRESHOLD_MSGS = "50000"; - public final String FETCH_THRESHOLD_BYTES = "100000"; - - private KafkaSystemConsumer createConsumer(String fetchMsg, String fetchBytes) { - final Map<String, String> map = new HashMap<>(); - - map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); - map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); - map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - BOOTSTRAP_SERVER); - map.put(JobConfig.JOB_NAME(), "jobName"); - - Config config = new MapConfig(map); - KafkaConsumerConfig consumerConfig = - KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID); - final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig); - - MockKafkaSystemConsumer newKafkaSystemConsumer = - new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, - new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); - - return newKafkaSystemConsumer; - } - - @Test - public void testConfigValidations() { - - final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - consumer.start(); - // should be no failures - } - - @Test - public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { - final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - final int partitionsNum = 50; - for (int i = 0; i < partitionsNum; i++) { - consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); - } - - consumer.start(); - - Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); - Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, - consumer.perPartitionFetchThresholdBytes); - - consumer.stop(); - } - - @Test - public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { - - KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); - - consumer.register(ssp0, "0"); - consumer.register(ssp0, "5"); - consumer.register(ssp1, "2"); - consumer.register(ssp1, "3"); - consumer.register(ssp2, "0"); - - assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); - assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); - assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); - } - - @Test - public void testFetchThresholdBytes() { - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - int partitionsNum = 2; - int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size - int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size - int ime11Size = 20; - ByteArraySerializer bytesSerde = new ByteArraySerializer(); - IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), - bytesSerde.serialize("", "value0".getBytes()), ime0Size); - IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), - bytesSerde.serialize("", "value1".getBytes()), ime1Size); - IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), - bytesSerde.serialize("", "value11".getBytes()), ime11Size); - KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); - - consumer.register(ssp0, "0"); - consumer.register(ssp1, "0"); - consumer.start(); - consumer.messageSink.addMessage(ssp0, ime0); - // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); - consumer.messageSink.addMessage(ssp1, ime1); - // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); - consumer.messageSink.addMessage(ssp1, ime11); - // queue for ssp1 should full now, because we added message of size 20 on top - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); - - Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); - Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); - Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); - Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); - - consumer.stop(); - } - - @Test - public void testFetchThresholdBytesDiabled() { - // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size - - SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); - SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); - int partitionsNum = 2; - int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit - int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit - int ime11Size = 20;// event with the second message still below the size limit - ByteArraySerializer bytesSerde = new ByteArraySerializer(); - IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), - bytesSerde.serialize("", "value0".getBytes()), ime0Size); - IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), - bytesSerde.serialize("", "value1".getBytes()), ime1Size); - IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), - bytesSerde.serialize("", "value11".getBytes()), ime11Size); - - // limit by number of messages 4/2 = 2 per partition - // limit by number of bytes - disabled - KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable - - consumer.register(ssp0, "0"); - consumer.register(ssp1, "0"); - consumer.start(); - consumer.messageSink.addMessage(ssp0, ime0); - // should be full by size, but not full by number of messages (1 of 2) - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); - consumer.messageSink.addMessage(ssp1, ime1); - // not full neither by size nor by messages - Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); - consumer.messageSink.addMessage(ssp1, ime11); - // not full by size, but should be full by messages - Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); - - Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); - Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); - Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); - Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); - - consumer.stop(); - } - - // mock kafkaConsumer and SystemConsumer - static class MockKafkaConsumer extends KafkaConsumer { - public MockKafkaConsumer(Map<String, Object> configs) { - super(configs); - } - } - - static class MockKafkaSystemConsumer extends KafkaSystemConsumer { - public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, - KafkaSystemConsumerMetrics metrics, Clock clock) { - super(kafkaConsumer, systemName, config, clientId, metrics, clock); - } - - @Override - void startConsumer() { - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java index 340f0e7..fc5e75a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java @@ -73,7 +73,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe String inputTopicName2 = "ad-clicks"; String outputTopicName = "user-ad-click-counts"; - KafkaSystemAdmin.deleteMessagesCalled_$eq(false); + KafkaSystemAdmin.deleteMessageCalled = false; initializeTopics(inputTopicName1, inputTopicName2, outputTopicName); @@ -95,7 +95,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2); assertEquals(2, messages.size()); - Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled()); + Assert.assertFalse(KafkaSystemAdmin.deleteMessageCalled); } @Test @@ -133,18 +133,20 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe // Verify that messages in the intermediate stream will be deleted in 10 seconds long startTimeMs = System.currentTimeMillis(); - for (String streamId: app.getIntermediateStreamIds()) { + for (String streamId : app.getIntermediateStreamIds()) { long remainingMessageNum = -1; while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) { remainingMessageNum = 0; - SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata( - new HashSet<>(Arrays.asList(streamId)), new ExponentialSleepStrategy.Mock(3) - ).get(streamId).get(); + SystemStreamMetadata metadatas = + (SystemStreamMetadata) systemAdmin.getSystemStreamMetadata(new HashSet<>(Arrays.asList(streamId)), + new ExponentialSleepStrategy.Mock(3)).get(streamId); - for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata().entrySet()) { + for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : metadatas.getSystemStreamPartitionMetadata() + .entrySet()) { SystemStreamPartitionMetadata metadata = entry.getValue(); - remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) - Long.parseLong(metadata.getOldestOffset()); + remainingMessageNum += + Long.parseLong(metadata.getUpcomingOffset()) - Long.parseLong(metadata.getOldestOffset()); } } assertEquals(0, remainingMessageNum); http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala index bc00305..2f18875 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala @@ -17,36 +17,39 @@ * under the License. */ package org.apache.samza.test.harness + import java.util.Properties import kafka.server.KafkaConfig -import kafka.utils.{TestUtils, ZkUtils} -import org.apache.kafka.common.security.JaasUtils -import org.apache.samza.system.kafka.KafkaSystemAdmin +import kafka.utils.TestUtils +import org.apache.samza.config.{JobConfig, KafkaConsumerConfig, MapConfig} +import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaSystemConsumer} /** - * LinkedIn integration test harness for Kafka - * This is simply a copy of open source code. We do this because java does not support trait and we are making it an - * abstract class so that user's java test class can extend it. - */ + * Integration test harness for Kafka + * We do this because java does not support trait and we are making it an + * abstract class so that user's java test class can extend it. + */ abstract class AbstractIntegrationTestHarness extends AbstractKafkaServerTestHarness { def generateConfigs() = TestUtils.createBrokerConfigs(clusterSize(), zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps())) /** - * User can override this method to return the number of brokers they want. - * By default only one broker will be launched. - * @return the number of brokers needed in the Kafka cluster for the test. - */ + * User can override this method to return the number of brokers they want. + * By default only one broker will be launched. + * + * @return the number of brokers needed in the Kafka cluster for the test. + */ def clusterSize(): Int = 1 /** - * User can override this method to apply customized configurations to the brokers. - * By default the only configuration is number of partitions when topics get automatically created. The default value - * is 1. - * @return The configurations to be used by brokers. - */ + * User can override this method to apply customized configurations to the brokers. + * By default the only configuration is number of partitions when topics get automatically created. The default value + * is 1. + * + * @return The configurations to be used by brokers. + */ def overridingProps(): Properties = { val props = new Properties() props.setProperty(KafkaConfig.NumPartitionsProp, 1.toString) @@ -54,13 +57,30 @@ abstract class AbstractIntegrationTestHarness extends AbstractKafkaServerTestHar } /** - * Returns the bootstrap servers configuration string to be used by clients. - * @return bootstrap servers string. - */ + * Returns the bootstrap servers configuration string to be used by clients. + * + * @return bootstrap servers string. + */ def bootstrapServers(): String = super.bootstrapUrl def createSystemAdmin(system: String): KafkaSystemAdmin = { - new KafkaSystemAdmin(system, bootstrapServers, connectZk = () => ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled)) + + val map: java.util.Map[String, String] = new java.util.HashMap(); + + val KAFKA_CONSUMER_PROPERTY_PREFIX: String = "systems." + system + ".consumer." + val KAFKA_PRODUCER_PROPERTY_PREFIX: String = "systems." + system + ".consumer." + + map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + + org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + map.put(JobConfig.JOB_NAME, "test.job") + + map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + + KafkaConsumerConfig.ZOOKEEPER_CONNECT, zkConnect) + + val config = new MapConfig(map) + val consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, system, KafkaConsumerConfig.createClientId("kafka-admin-consumer", config)) + + new KafkaSystemAdmin(system, new MapConfig(map), KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig)); } } \ No newline at end of file
