Repository: samza
Updated Branches:
  refs/heads/master 3c78e06ac -> 63d33fa06


http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
new file mode 100644
index 0000000..6a03198
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import kafka.admin.AdminClient;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.apache.samza.system.kafka.KafkaSystemDescriptor.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestKafkaSystemAdminWithMock {
+  private KafkaSystemAdmin kafkaSystemAdmin;
+  //private KafkaSystemAdmin kafkaAdmin;
+  private Config testConfig;
+  private Consumer<byte[], byte[]> mockKafkaConsumer;
+  private PartitionInfo mockPartitionInfo0;
+  private PartitionInfo mockPartitionInfo1;
+  private TopicPartition testTopicPartition0;
+  private TopicPartition testTopicPartition1;
+
+  private ConcurrentHashMap<String, KafkaSystemConsumer> consumersReference;
+
+  private static final String VALID_TOPIC = "validTopic";
+  private static final String INVALID_TOPIC = "invalidTopic";
+  private static final String TEST_SYSTEM = "testSystem";
+  private static final Long KAFKA_BEGINNING_OFFSET_FOR_PARTITION0 = 10L;
+  private static final Long KAFKA_BEGINNING_OFFSET_FOR_PARTITION1 = 11L;
+  private static final Long KAFKA_END_OFFSET_FOR_PARTITION0 = 20L;
+  private static final Long KAFKA_END_OFFSET_FOR_PARTITION1 = 21L;
+
+  @Before
+  public void setUp() throws Exception {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), 
TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        "localhost:123");
+    configMap.put(String.format(KafkaConfig.CONSUMER_ZK_CONNECT_CONFIG_KEY(), 
TEST_SYSTEM), "localhost:124");
+    configMap.put(JobConfig.JOB_NAME(), "jobName");
+    configMap.put(JobConfig.JOB_ID(), "jobId");
+
+    testConfig = new MapConfig(configMap);
+
+    consumersReference = new ConcurrentHashMap<>();
+
+    // mock PartitionInfo
+    mockPartitionInfo0 = mock(PartitionInfo.class);
+    when(mockPartitionInfo0.topic()).thenReturn(VALID_TOPIC);
+    when(mockPartitionInfo0.partition()).thenReturn(0);
+    mockPartitionInfo1 = mock(PartitionInfo.class);
+    when(mockPartitionInfo1.topic()).thenReturn(VALID_TOPIC);
+    when(mockPartitionInfo1.partition()).thenReturn(1);
+
+    // mock LinkedInKafkaConsumerImpl constructor
+    mockKafkaConsumer = mock(KafkaConsumer.class);
+
+    // mock LinkedInKafkaConsumerImpl other behaviors
+    testTopicPartition0 = new TopicPartition(VALID_TOPIC, 0);
+    testTopicPartition1 = new TopicPartition(VALID_TOPIC, 1);
+    Map<TopicPartition, Long> testBeginningOffsets =
+        ImmutableMap.of(testTopicPartition0, 
KAFKA_BEGINNING_OFFSET_FOR_PARTITION0, testTopicPartition1,
+            KAFKA_BEGINNING_OFFSET_FOR_PARTITION1);
+    Map<TopicPartition, Long> testEndOffsets =
+        ImmutableMap.of(testTopicPartition0, KAFKA_END_OFFSET_FOR_PARTITION0, 
testTopicPartition1,
+            KAFKA_END_OFFSET_FOR_PARTITION1);
+
+    when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenReturn(
+        ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1));
+    
when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(testTopicPartition0, 
testTopicPartition1))).thenReturn(
+        testBeginningOffsets);
+    when(mockKafkaConsumer.endOffsets(ImmutableList.of(testTopicPartition0, 
testTopicPartition1))).thenReturn(
+        testEndOffsets);
+
+    kafkaSystemAdmin =
+        new KafkaSystemAdmin(TEST_SYSTEM, testConfig, mockKafkaConsumer);
+
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  @Test
+  public void testGetSystemStreamMetaDataWithValidTopic() {
+    System.out.println("STARTING");
+    Map<String, SystemStreamMetadata> metadataMap =
+        kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
+
+    // verify metadata size
+    assertEquals("metadata should return for 1 topic", metadataMap.size(), 1);
+    System.out.println("STARTING1");
+    // verify the metadata streamName
+    assertEquals("the stream name should be " + VALID_TOPIC, 
metadataMap.get(VALID_TOPIC).getStreamName(), VALID_TOPIC);
+    System.out.println("STARTING2");
+    // verify the offset for each partition
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
systemStreamPartitionMetadata =
+        metadataMap.get(VALID_TOPIC).getSystemStreamPartitionMetadata();
+    assertEquals("there are 2 partitions", 
systemStreamPartitionMetadata.size(), 2);
+    System.out.println("STARTING3");
+    SystemStreamMetadata.SystemStreamPartitionMetadata partition0Metadata =
+        systemStreamPartitionMetadata.get(new Partition(0));
+    assertEquals("oldest offset for partition 0", 
partition0Metadata.getOldestOffset(),
+        KAFKA_BEGINNING_OFFSET_FOR_PARTITION0.toString());
+    assertEquals("upcoming offset for partition 0", 
partition0Metadata.getUpcomingOffset(),
+        KAFKA_END_OFFSET_FOR_PARTITION0.toString());
+    assertEquals("newest offset for partition 0", 
partition0Metadata.getNewestOffset(),
+        Long.toString(KAFKA_END_OFFSET_FOR_PARTITION0 - 1));
+    System.out.println("STARTING4");
+    SystemStreamMetadata.SystemStreamPartitionMetadata partition1Metadata =
+        systemStreamPartitionMetadata.get(new Partition(1));
+    assertEquals("oldest offset for partition 1", 
partition1Metadata.getOldestOffset(),
+        KAFKA_BEGINNING_OFFSET_FOR_PARTITION1.toString());
+    assertEquals("upcoming offset for partition 1", 
partition1Metadata.getUpcomingOffset(),
+        KAFKA_END_OFFSET_FOR_PARTITION1.toString());
+    assertEquals("newest offset for partition 1", 
partition1Metadata.getNewestOffset(),
+        Long.toString(KAFKA_END_OFFSET_FOR_PARTITION1 - 1));
+  }
+
+  @Test
+  public void testGetSystemStreamMetaDataWithInvalidTopic() {
+    Map<String, SystemStreamMetadata> metadataMap =
+        
kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(INVALID_TOPIC));
+    assertEquals("empty metadata for invalid topic", metadataMap.size(), 0);
+  }
+
+  @Test
+  public void testGetSystemStreamMetaDataWithNoTopic() {
+    Map<String, SystemStreamMetadata> metadataMap = 
kafkaSystemAdmin.getSystemStreamMetadata(Collections.emptySet());
+    assertEquals("empty metadata for no topic", metadataMap.size(), 0);
+  }
+
+  @Test
+  public void testGetSystemStreamMetaDataForTopicWithNoMessage() {
+    // The topic with no messages will have beginningOffset = 0 and endOffset 
= 0
+    
when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(testTopicPartition0, 
testTopicPartition1))).thenReturn(
+        ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L));
+    when(mockKafkaConsumer.endOffsets(ImmutableList.of(testTopicPartition0, 
testTopicPartition1))).thenReturn(
+        ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L));
+
+    Map<String, SystemStreamMetadata> metadataMap =
+        kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
+    assertEquals("metadata should return for 1 topic", metadataMap.size(), 1);
+
+    // verify the metadata streamName
+    assertEquals("the stream name should be " + VALID_TOPIC, 
metadataMap.get(VALID_TOPIC).getStreamName(), VALID_TOPIC);
+
+    // verify the offset for each partition
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
systemStreamPartitionMetadata =
+        metadataMap.get(VALID_TOPIC).getSystemStreamPartitionMetadata();
+    assertEquals("there are 2 partitions", 
systemStreamPartitionMetadata.size(), 2);
+
+    SystemStreamMetadata.SystemStreamPartitionMetadata partition0Metadata =
+        systemStreamPartitionMetadata.get(new Partition(0));
+    assertEquals("oldest offset for partition 0", 
partition0Metadata.getOldestOffset(), "0");
+    assertEquals("upcoming offset for partition 0", 
partition0Metadata.getUpcomingOffset(), "0");
+    assertEquals("newest offset is not set due to abnormal upcoming offset", 
partition0Metadata.getNewestOffset(),
+        null);
+
+    SystemStreamMetadata.SystemStreamPartitionMetadata partition1Metadata =
+        systemStreamPartitionMetadata.get(new Partition(1));
+    assertEquals("oldest offset for partition 1", 
partition1Metadata.getOldestOffset(), "0");
+    assertEquals("upcoming offset for partition 1", 
partition1Metadata.getUpcomingOffset(), "0");
+    assertEquals("newest offset is not set due to abnormal upcoming offset", 
partition1Metadata.getNewestOffset(),
+        null);
+  }
+
+  @Test
+  public void testGetSSPMetadata() {
+    SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, 
"otherTopic", new Partition(1));
+    TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0);
+    TopicPartition otherTopicPartition = new TopicPartition("otherTopic", 1);
+    when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition, 
otherTopicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 1L, otherTopicPartition, 2L));
+    when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition, 
otherTopicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 11L, otherTopicPartition, 12L));
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> expected =
+        ImmutableMap.of(ssp, new 
SystemStreamMetadata.SystemStreamPartitionMetadata("1", "10", "11"), otherSSP,
+            new SystemStreamMetadata.SystemStreamPartitionMetadata("2", "11", 
"12"));
+    assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp, 
otherSSP)), expected);
+  }
+
+  @Test
+  public void testGetSSPMetadataEmptyPartition() {
+    SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    SystemStreamPartition otherSSP = new SystemStreamPartition(TEST_SYSTEM, 
"otherTopic", new Partition(1));
+    TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0);
+    TopicPartition otherTopicPartition = new TopicPartition("otherTopic", 1);
+    when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition, 
otherTopicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 1L));
+    when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition, 
otherTopicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 11L));
+
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> expected =
+        ImmutableMap.of(ssp, new 
SystemStreamMetadata.SystemStreamPartitionMetadata("1", "10", "11"), otherSSP,
+            new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, 
null));
+    assertEquals(expected, 
kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp, otherSSP)));
+  }
+
+  @Test
+  public void testGetSSPMetadataEmptyUpcomingOffset() {
+    SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0);
+    
when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 0L));
+    
when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition))).thenReturn(ImmutableMap.of());
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> expected =
+        ImmutableMap.of(ssp, new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", null, null));
+    assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp)), 
expected);
+  }
+
+  @Test
+  public void testGetSSPMetadataZeroUpcomingOffset() {
+    SystemStreamPartition ssp = new SystemStreamPartition(TEST_SYSTEM, 
VALID_TOPIC, new Partition(0));
+    TopicPartition topicPartition = new TopicPartition(VALID_TOPIC, 0);
+    
when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(topicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, -1L));
+    
when(mockKafkaConsumer.endOffsets(ImmutableList.of(topicPartition))).thenReturn(
+        ImmutableMap.of(topicPartition, 0L));
+    Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> expected =
+        ImmutableMap.of(ssp, new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", null, "0"));
+    assertEquals(kafkaSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp)), 
expected);
+  }
+
+  @Test
+  public void testGetSystemStreamMetaDataWithRetry() {
+    final List<PartitionInfo> partitionInfosForTopic = 
ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1);
+    when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new 
RuntimeException())
+        .thenReturn(partitionInfosForTopic);
+
+    Map<String, SystemStreamMetadata> metadataMap =
+        kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
+    assertEquals("metadata should return for 1 topic", metadataMap.size(), 1);
+
+    // retried twice because the first fails and the second succeeds
+    Mockito.verify(mockKafkaConsumer, 
Mockito.times(2)).partitionsFor(VALID_TOPIC);
+
+    final List<TopicPartition> topicPartitions =
+        Arrays.asList(new TopicPartition(mockPartitionInfo0.topic(), 
mockPartitionInfo0.partition()),
+            new TopicPartition(mockPartitionInfo1.topic(), 
mockPartitionInfo1.partition()));
+    // the following methods thereafter are only called once
+    Mockito.verify(mockKafkaConsumer, 
Mockito.times(1)).beginningOffsets(topicPartitions);
+    Mockito.verify(mockKafkaConsumer, 
Mockito.times(1)).endOffsets(topicPartitions);
+  }
+
+  @Test(expected = SamzaException.class)
+  public void 
testGetSystemStreamMetadataShouldTerminateAfterFiniteRetriesOnException() {
+    when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new 
RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException());
+
+    kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void 
testGetSystemStreamPartitionCountsShouldTerminateAfterFiniteRetriesOnException()
 throws Exception {
+    final Set<String> streamNames = ImmutableSet.of(VALID_TOPIC);
+    final long cacheTTL = 100L;
+
+    when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new 
RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException())
+        .thenThrow(new RuntimeException());
+
+    kafkaSystemAdmin.getSystemStreamPartitionCounts(streamNames, cacheTTL);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
new file mode 100644
index 0000000..981ac45
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.KafkaConsumerConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestKafkaSystemConsumer {
+  public final String TEST_SYSTEM = "test-system";
+  public final String TEST_STREAM = "test-stream";
+  public final String TEST_JOB = "test-job";
+  public final String TEST_PREFIX_ID = "testClientId";
+  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
+  public final String FETCH_THRESHOLD_MSGS = "50000";
+  public final String FETCH_THRESHOLD_BYTES = "100000";
+
+  private KafkaSystemConsumer createConsumer(String fetchMsg, String 
fetchBytes) {
+    final Map<String, String> map = new HashMap<>();
+
+    map.put(JobConfig.JOB_NAME(), TEST_JOB);
+
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), 
TEST_SYSTEM), fetchMsg);
+    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), 
TEST_SYSTEM), fetchBytes);
+    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+        BOOTSTRAP_SERVER);
+    map.put(JobConfig.JOB_NAME(), "jobName");
+
+    Config config = new MapConfig(map);
+    String clientId = KafkaConsumerConfig.createClientId(TEST_PREFIX_ID, 
config);
+    KafkaConsumerConfig consumerConfig =
+        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, 
clientId);
+
+    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
MockKafkaConsumer(consumerConfig);
+
+    MockKafkaSystemConsumer newKafkaSystemConsumer =
+        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, 
TEST_PREFIX_ID,
+            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new 
NoOpMetricsRegistry()), System::currentTimeMillis);
+
+    return newKafkaSystemConsumer;
+  }
+
+  @Test
+  public void testConfigValidations() {
+
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.start();
+    // should be no failures
+  }
+
+  @Test
+  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
+    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+    final int partitionsNum = 50;
+    for (int i = 0; i < partitionsNum; i++) {
+      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(i)), "0");
+    }
+
+    consumer.start();
+
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, 
consumer.perPartitionFetchThreshold);
+    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / 
partitionsNum,
+        consumer.perPartitionFetchThresholdBytes);
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
+
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(2));
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp0, "5");
+    consumer.register(ssp1, "2");
+    consumer.register(ssp1, "3");
+    consumer.register(ssp2, "0");
+
+    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
+    assertEquals("2", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
+    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+  }
+
+  @Test
+  public void testFetchThresholdBytes() {
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; 
// fake size
+    int ime11Size = 20;
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // queue for ssp0 should be full now, because we added message of size 
FETCH_THRESHOLD_MSGS/partitionsNum
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // queue for ssp1 should be less then full now, because we added message 
of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // queue for ssp1 should full now, because we added message of size 20 on 
top
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  @Test
+  public void testFetchThresholdBytesDiabled() {
+    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by 
size
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
+    int partitionsNum = 2;
+    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size, upto the limit
+    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 
100; // fake size, below the limit
+    int ime11Size = 20;// event with the second message still below the size 
limit
+    ByteArraySerializer bytesSerde = new ByteArraySerializer();
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
+        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
+        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
+    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
+        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
+
+    // limit by number of messages 4/2 = 2 per partition
+    // limit by number of bytes - disabled
+    KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
+
+    consumer.register(ssp0, "0");
+    consumer.register(ssp1, "0");
+    consumer.start();
+    consumer.messageSink.addMessage(ssp0, ime0);
+    // should be full by size, but not full by number of messages (1 of 2)
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
+    consumer.messageSink.addMessage(ssp1, ime1);
+    // not full neither by size nor by messages
+    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
+    consumer.messageSink.addMessage(ssp1, ime11);
+    // not full by size, but should be full by messages
+    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
+
+    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
+    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
+    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
+    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
+
+    consumer.stop();
+  }
+
+  // mock kafkaConsumer and SystemConsumer
+  static class MockKafkaConsumer extends KafkaConsumer {
+    public MockKafkaConsumer(Map<String, Object> configs) {
+      super(configs);
+    }
+  }
+
+  static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
+    public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, 
Config config, String clientId,
+        KafkaSystemConsumerMetrics metrics, Clock clock) {
+      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
+    }
+
+    @Override
+    void startConsumer() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
new file mode 100644
index 0000000..03b0564
--- /dev/null
+++ 
b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumerMetrics.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import kafka.common.TopicAndPartition;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestKafkaSystemConsumerMetrics {
+  @Test
+  public void testKafkaSystemConsumerMetrics() {
+    String systemName = "system";
+    TopicAndPartition tp1 = new TopicAndPartition("topic1", 1);
+    TopicAndPartition tp2 = new TopicAndPartition("topic2", 2);
+    String clientName = "clientName";
+
+    // record expected values for further comparison
+    Map<String, String> expectedValues = new HashMap<>();
+
+    ReadableMetricsRegistry registry = new MetricsRegistryMap();
+    KafkaSystemConsumerMetrics metrics = new 
KafkaSystemConsumerMetrics(systemName, registry);
+
+    // initialize the metrics for the partitions
+    metrics.registerTopicAndPartition(tp1);
+    metrics.registerTopicAndPartition(tp2);
+
+    // initialize the metrics for the host:port
+    metrics.registerClientProxy(clientName);
+
+    metrics.setOffsets(tp1, 1001);
+    metrics.setOffsets(tp2, 1002);
+    expectedValues.put(metrics.offsets().get(tp1).getName(), "1001");
+    expectedValues.put(metrics.offsets().get(tp2).getName(), "1002");
+
+    metrics.incBytesReads(tp1, 10);
+    metrics.incBytesReads(tp1, 5); // total 15
+    expectedValues.put(metrics.bytesRead().get(tp1).getName(), "15");
+
+    metrics.incReads(tp1);
+    metrics.incReads(tp1); // total 2
+    expectedValues.put(metrics.reads().get(tp1).getName(), "2");
+
+    metrics.setHighWatermarkValue(tp2, 1000);
+    metrics.setHighWatermarkValue(tp2, 1001); // final value 1001
+    expectedValues.put(metrics.highWatermark().get(tp2).getName(), "1001");
+
+    metrics.setLagValue(tp1, 200);
+    metrics.setLagValue(tp1, 201); // final value 201
+    expectedValues.put(metrics.lag().get(tp1).getName(), "201");
+
+    metrics.incClientBytesReads(clientName, 100); // broker-bytes-read
+    metrics.incClientBytesReads(clientName, 110); // total 210
+    expectedValues.put(metrics.clientBytesRead().get(clientName).getName(), 
"210");
+
+    metrics.incClientReads(clientName); // messages-read
+    metrics.incClientReads(clientName); // total 2
+    expectedValues.put(metrics.clientReads().get(clientName).getName(), "2");
+
+    metrics.setNumTopicPartitions(clientName, 2); // "topic-partitions"
+    metrics.setNumTopicPartitions(clientName, 3); // final value 3
+    expectedValues.put(metrics.topicPartitions().get(clientName).getName(), 
"3");
+
+
+    String groupName = metrics.group();
+    Assert.assertEquals(groupName, KafkaSystemConsumerMetrics.class.getName());
+    Assert.assertEquals(metrics.systemName(), systemName);
+
+    Map<String, Metric> metricMap = registry.getGroup(groupName);
+    validate(metricMap, expectedValues);
+  }
+
+  protected static void validate(Map<String, Metric> metricMap, Map<String, 
String> expectedValues) {
+    // match the expected value, set in the test above, and the value in the 
metrics
+    for(Map.Entry<String, String> e: expectedValues.entrySet()) {
+      String metricName = e.getKey();
+      String expectedValue = e.getValue();
+      // get the metric from the registry
+      String actualValue = metricMap.get(metricName).toString();
+
+      //System.out.println("name=" + metricName + " expVal="  + expectedValue 
+ " actVal=" + actualValue);
+      Assert.assertEquals("failed for metricName=" + metricName, actualValue, 
expectedValue);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index cd511f2..1570363 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -21,22 +21,18 @@
 
 package org.apache.samza.system.kafka
 
-import java.util.{Properties, UUID}
-
 import kafka.admin.AdminUtils
-import org.apache.kafka.common.errors.LeaderNotAvailableException
-import org.apache.kafka.common.protocol.Errors
 import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
+import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.samza.Partition
-import org.apache.samza.config.KafkaProducerConfig
+import org.apache.samza.config._
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, 
SystemStreamPartition}
-import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, 
TopicMetadataStore}
 import org.junit.Assert._
 import org.junit._
 
@@ -53,6 +49,9 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   val TOTAL_PARTITIONS = 50
   val REPLICATION_FACTOR = 2
   val zkSecure = JaasUtils.isZkSecurityEnabled()
+  val KAFKA_CONSUMER_PROPERTY_PREFIX: String = "systems." + SYSTEM + 
".consumer."
+  val KAFKA_PRODUCER_PROPERTY_PREFIX: String = "systems." + SYSTEM + 
".producer."
+
 
   protected def numBrokers: Int = 3
 
@@ -69,14 +68,19 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   @BeforeClass
   override def setUp() {
     super.setUp()
-    val config = new java.util.HashMap[String, String]()
-    config.put("bootstrap.servers", brokerList)
-    config.put("acks", "all")
-    config.put("serializer.class", "kafka.serializer.StringEncoder")
-    producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+    val map = new java.util.HashMap[String, String]()
+    map.put("bootstrap.servers", brokerList)
+    map.put(KAFKA_CONSUMER_PROPERTY_PREFIX +
+      
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList)
+    map.put("acks", "all")
+    map.put("serializer.class", "kafka.serializer.StringEncoder")
+
+
+    producerConfig = new KafkaProducerConfig("kafka", "i001", map)
     producer = new KafkaProducer[Array[Byte], 
Array[Byte]](producerConfig.getProducerProperties)
     metadataStore = new ClientUtilTopicMetadataStore(brokerList, 
"some-job-name")
-    systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => 
ZkUtils(zkConnect, 6000, 6000, zkSecure))
+    systemAdmin = createSystemAdmin(SYSTEM, map)
+
     systemAdmin.start()
   }
 
@@ -122,7 +126,7 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness {
   }
 
   def getConsumerConnector(): ConsumerConnector = {
-    val props = new Properties
+    val props = new java.util.Properties
 
     props.put("zookeeper.connect", zkConnect)
     props.put("group.id", "test")
@@ -132,18 +136,36 @@ object TestKafkaSystemAdmin extends 
KafkaServerTestHarness {
     Consumer.create(consumerConfig)
   }
 
-  def createSystemAdmin(coordinatorStreamProperties: Properties, 
coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, 
ChangelogInfo]): KafkaSystemAdmin = {
-    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => 
ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
-      coordinatorStreamReplicationFactor, 10000, 
ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, 
topicMetaInformation, Map(), false)
-  }
+  def createSystemAdmin(system: String, map: java.util.Map[String, String]) = {
+    // required configs - boostraplist, zkconnect and jobname
+    map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+      brokerList)
+    map.put(KAFKA_PRODUCER_PROPERTY_PREFIX + 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+      brokerList)
+    map.put(JobConfig.JOB_NAME, "job.name")
+    map.put(KAFKA_CONSUMER_PROPERTY_PREFIX + 
KafkaConsumerConfig.ZOOKEEPER_CONNECT, zkConnect)
+
+    val config: Config = new MapConfig(map)
+    val res = 
KafkaSystemAdminUtilsScala.getIntermediateStreamProperties(config)
 
+
+    val clientId = KafkaConsumerConfig.createClientId("clientPrefix", config);
+    // extract kafka client configs
+    val consumerConfig = 
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, system, clientId)
+
+    new KafkaSystemAdmin(
+      system,
+      config,
+      KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig))
+  }
 }
 
 /**
- * Test creates a local ZK and Kafka cluster, and uses it to create and test
- * topics for to verify that offset APIs in SystemAdmin work as expected.
- */
+  * Test creates a local ZK and Kafka cluster, and uses it to create and test
+  * topics for to verify that offset APIs in SystemAdmin work as expected.
+  */
 class TestKafkaSystemAdmin {
+
   import TestKafkaSystemAdmin._
 
   @Test
@@ -163,7 +185,7 @@ class TestKafkaSystemAdmin {
       new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2",
       new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3",
       new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4")
-    val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, 
newestOffsets, upcomingOffsets)
+    val metadata = KafkaSystemAdminUtilsScala.assembleMetadata(oldestOffsets, 
newestOffsets, upcomingOffsets)
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
     assertTrue(metadata.contains("stream1"))
@@ -271,7 +293,9 @@ class TestKafkaSystemAdmin {
   @Test
   def testShouldCreateCoordinatorStream {
     val topic = "test-coordinator-stream"
-    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => 
ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 
3)
+    val map = new java.util.HashMap[String, String]()
+    
map.put(org.apache.samza.config.KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, 
"3")
+    val systemAdmin = createSystemAdmin(SYSTEM, map)
 
     val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
     systemAdmin.createStream(spec)
@@ -284,30 +308,6 @@ class TestKafkaSystemAdmin {
     assertEquals(3, partitionMetadata.replicas.size)
   }
 
-  class KafkaSystemAdminWithTopicMetadataError extends 
KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, 
zkSecure)) {
-    import kafka.api.TopicMetadata
-    var metadataCallCount = 0
-
-    // Simulate Kafka telling us that the leader for the topic is not available
-    override def getTopicMetadata(topics: Set[String]) = {
-      metadataCallCount += 1
-      val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = 
Seq(), error = Errors.LEADER_NOT_AVAILABLE)
-      Map("quux" -> topicMetadata)
-    }
-  }
-
-  @Test
-  def testShouldRetryOnTopicMetadataError {
-    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
-    val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
-    try {
-      systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff)
-      fail("expected CallLimitReached to be thrown")
-    } catch {
-      case e: ExponentialSleepStrategy.CallLimitReached => ()
-    }
-  }
-
   @Test
   def testGetNewestOffset {
     createTopic(TOPIC2, 16)
@@ -335,17 +335,4 @@ class TestKafkaSystemAdmin {
     assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
     assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
   }
-
-  @Test (expected = classOf[LeaderNotAvailableException])
-  def testGetNewestOffsetMaxRetry {
-    val expectedRetryCount = 3
-    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
-    try {
-      systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", 
new Partition(0)), 3)
-    } catch {
-      case e: Exception =>
-        assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
-        throw e
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
deleted file mode 100644
index 5791545..0000000
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.samza.system.kafka;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.KafkaConsumerConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.NoOpMetricsRegistry;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-public class TestKafkaSystemConsumer {
-  public final String TEST_SYSTEM = "test-system";
-  public final String TEST_STREAM = "test-stream";
-  public final String TEST_CLIENT_ID = "testClientId";
-  public final String BOOTSTRAP_SERVER = "127.0.0.1:8888";
-  public final String FETCH_THRESHOLD_MSGS = "50000";
-  public final String FETCH_THRESHOLD_BYTES = "100000";
-
-  private KafkaSystemConsumer createConsumer(String fetchMsg, String 
fetchBytes) {
-    final Map<String, String> map = new HashMap<>();
-
-    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), 
TEST_SYSTEM), fetchMsg);
-    map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), 
TEST_SYSTEM), fetchBytes);
-    map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, 
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-        BOOTSTRAP_SERVER);
-    map.put(JobConfig.JOB_NAME(), "jobName");
-
-    Config config = new MapConfig(map);
-    KafkaConsumerConfig consumerConfig =
-        KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, 
TEST_CLIENT_ID);
-    final KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
MockKafkaConsumer(consumerConfig);
-
-    MockKafkaSystemConsumer newKafkaSystemConsumer =
-        new MockKafkaSystemConsumer(kafkaConsumer, TEST_SYSTEM, config, 
TEST_CLIENT_ID,
-            new KafkaSystemConsumerMetrics(TEST_SYSTEM, new 
NoOpMetricsRegistry()), System::currentTimeMillis);
-
-    return newKafkaSystemConsumer;
-  }
-
-  @Test
-  public void testConfigValidations() {
-
-    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-
-    consumer.start();
-    // should be no failures
-  }
-
-  @Test
-  public void testFetchThresholdShouldDivideEvenlyAmongPartitions() {
-    final KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-    final int partitionsNum = 50;
-    for (int i = 0; i < partitionsNum; i++) {
-      consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(i)), "0");
-    }
-
-    consumer.start();
-
-    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, 
consumer.perPartitionFetchThreshold);
-    Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / 
partitionsNum,
-        consumer.perPartitionFetchThresholdBytes);
-
-    consumer.stop();
-  }
-
-  @Test
-  public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
-
-    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(2));
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp0, "5");
-    consumer.register(ssp1, "2");
-    consumer.register(ssp1, "3");
-    consumer.register(ssp2, "0");
-
-    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
-    assertEquals("2", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
-    assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
-  }
-
-  @Test
-  public void testFetchThresholdBytes() {
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    int partitionsNum = 2;
-    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size
-    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; 
// fake size
-    int ime11Size = 20;
-    ByteArraySerializer bytesSerde = new ByteArraySerializer();
-    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
-        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
-    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
-        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
-    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
-        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
-    KafkaSystemConsumer consumer = createConsumer(FETCH_THRESHOLD_MSGS, 
FETCH_THRESHOLD_BYTES);
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp1, "0");
-    consumer.start();
-    consumer.messageSink.addMessage(ssp0, ime0);
-    // queue for ssp0 should be full now, because we added message of size 
FETCH_THRESHOLD_MSGS/partitionsNum
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0));
-    consumer.messageSink.addMessage(ssp1, ime1);
-    // queue for ssp1 should be less then full now, because we added message 
of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
-    consumer.messageSink.addMessage(ssp1, ime11);
-    // queue for ssp1 should full now, because we added message of size 20 on 
top
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
-
-    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
-    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
-    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
-    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
-
-    consumer.stop();
-  }
-
-  @Test
-  public void testFetchThresholdBytesDiabled() {
-    // Pass 0 as fetchThresholdByBytes, which disables checking for limit by 
size
-
-    SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(0));
-    SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, 
TEST_STREAM, new Partition(1));
-    int partitionsNum = 2;
-    int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // 
fake size, upto the limit
-    int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 
100; // fake size, below the limit
-    int ime11Size = 20;// event with the second message still below the size 
limit
-    ByteArraySerializer bytesSerde = new ByteArraySerializer();
-    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", 
bytesSerde.serialize("", "key0".getBytes()),
-        bytesSerde.serialize("", "value0".getBytes()), ime0Size);
-    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key1".getBytes()),
-        bytesSerde.serialize("", "value1".getBytes()), ime1Size);
-    IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", 
bytesSerde.serialize("", "key11".getBytes()),
-        bytesSerde.serialize("", "value11".getBytes()), ime11Size);
-
-    // limit by number of messages 4/2 = 2 per partition
-    // limit by number of bytes - disabled
-    KafkaSystemConsumer consumer = createConsumer("4", "0"); // should disable
-
-    consumer.register(ssp0, "0");
-    consumer.register(ssp1, "0");
-    consumer.start();
-    consumer.messageSink.addMessage(ssp0, ime0);
-    // should be full by size, but not full by number of messages (1 of 2)
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0));
-    consumer.messageSink.addMessage(ssp1, ime1);
-    // not full neither by size nor by messages
-    Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1));
-    consumer.messageSink.addMessage(ssp1, ime11);
-    // not full by size, but should be full by messages
-    Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1));
-
-    Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0));
-    Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1));
-    Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0));
-    Assert.assertEquals(ime1Size + ime11Size, 
consumer.getMessagesSizeInQueue(ssp1));
-
-    consumer.stop();
-  }
-
-  // mock kafkaConsumer and SystemConsumer
-  static class MockKafkaConsumer extends KafkaConsumer {
-    public MockKafkaConsumer(Map<String, Object> configs) {
-      super(configs);
-    }
-  }
-
-  static class MockKafkaSystemConsumer extends KafkaSystemConsumer {
-    public MockKafkaSystemConsumer(Consumer kafkaConsumer, String systemName, 
Config config, String clientId,
-        KafkaSystemConsumerMetrics metrics, Clock clock) {
-      super(kafkaConsumer, systemName, config, clientId, metrics, clock);
-    }
-
-    @Override
-    void startConsumer() {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 340f0e7..fc5e75a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -73,7 +73,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     String inputTopicName2 = "ad-clicks";
     String outputTopicName = "user-ad-click-counts";
 
-    KafkaSystemAdmin.deleteMessagesCalled_$eq(false);
+    KafkaSystemAdmin.deleteMessageCalled = false;
 
     initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);
 
@@ -95,7 +95,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
     assertEquals(2, messages.size());
 
-    Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled());
+    Assert.assertFalse(KafkaSystemAdmin.deleteMessageCalled);
   }
 
   @Test
@@ -133,18 +133,20 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
 
     // Verify that messages in the intermediate stream will be deleted in 10 
seconds
     long startTimeMs = System.currentTimeMillis();
-    for (String streamId: app.getIntermediateStreamIds()) {
+    for (String streamId : app.getIntermediateStreamIds()) {
       long remainingMessageNum = -1;
 
       while (remainingMessageNum != 0 && System.currentTimeMillis() - 
startTimeMs < 10000) {
         remainingMessageNum = 0;
-        SystemStreamMetadata metadatas = systemAdmin.getSystemStreamMetadata(
-            new HashSet<>(Arrays.asList(streamId)), new 
ExponentialSleepStrategy.Mock(3)
-        ).get(streamId).get();
+        SystemStreamMetadata metadatas =
+            (SystemStreamMetadata) systemAdmin.getSystemStreamMetadata(new 
HashSet<>(Arrays.asList(streamId)),
+                new ExponentialSleepStrategy.Mock(3)).get(streamId);
 
-        for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : 
metadatas.getSystemStreamPartitionMetadata().entrySet()) {
+        for (Map.Entry<Partition, SystemStreamPartitionMetadata> entry : 
metadatas.getSystemStreamPartitionMetadata()
+            .entrySet()) {
           SystemStreamPartitionMetadata metadata = entry.getValue();
-          remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) 
- Long.parseLong(metadata.getOldestOffset());
+          remainingMessageNum +=
+              Long.parseLong(metadata.getUpcomingOffset()) - 
Long.parseLong(metadata.getOldestOffset());
         }
       }
       assertEquals(0, remainingMessageNum);

http://git-wip-us.apache.org/repos/asf/samza/blob/63d33fa0/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
index bc00305..2f18875 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/harness/AbstractIntegrationTestHarness.scala
@@ -17,36 +17,39 @@
  * under the License.
  */
 package org.apache.samza.test.harness
+
 import java.util.Properties
 
 import kafka.server.KafkaConfig
-import kafka.utils.{TestUtils, ZkUtils}
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.samza.system.kafka.KafkaSystemAdmin
+import kafka.utils.TestUtils
+import org.apache.samza.config.{JobConfig, KafkaConsumerConfig, MapConfig}
+import org.apache.samza.system.kafka.{KafkaSystemAdmin, KafkaSystemConsumer}
 
 /**
- * LinkedIn integration test harness for Kafka
- * This is simply a copy of open source code. We do this because java does not 
support trait and we are making it an
- * abstract class so that user's java test class can extend it.
- */
+  * Integration test harness for Kafka
+  * We do this because java does not support trait and we are making it an
+  * abstract class so that user's java test class can extend it.
+  */
 abstract class AbstractIntegrationTestHarness extends 
AbstractKafkaServerTestHarness {
 
   def generateConfigs() =
     TestUtils.createBrokerConfigs(clusterSize(), zkConnect, 
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, 
overridingProps()))
 
   /**
-   * User can override this method to return the number of brokers they want.
-   * By default only one broker will be launched.
-   * @return the number of brokers needed in the Kafka cluster for the test.
-   */
+    * User can override this method to return the number of brokers they want.
+    * By default only one broker will be launched.
+    *
+    * @return the number of brokers needed in the Kafka cluster for the test.
+    */
   def clusterSize(): Int = 1
 
   /**
-   * User can override this method to apply customized configurations to the 
brokers.
-   * By default the only configuration is number of partitions when topics get 
automatically created. The default value
-   * is 1.
-   * @return The configurations to be used by brokers.
-   */
+    * User can override this method to apply customized configurations to the 
brokers.
+    * By default the only configuration is number of partitions when topics 
get automatically created. The default value
+    * is 1.
+    *
+    * @return The configurations to be used by brokers.
+    */
   def overridingProps(): Properties = {
     val props = new Properties()
     props.setProperty(KafkaConfig.NumPartitionsProp, 1.toString)
@@ -54,13 +57,30 @@ abstract class AbstractIntegrationTestHarness extends 
AbstractKafkaServerTestHar
   }
 
   /**
-   * Returns the bootstrap servers configuration string to be used by clients.
-   * @return bootstrap servers string.
-   */
+    * Returns the bootstrap servers configuration string to be used by clients.
+    *
+    * @return bootstrap servers string.
+    */
   def bootstrapServers(): String = super.bootstrapUrl
 
   def createSystemAdmin(system: String): KafkaSystemAdmin = {
-    new KafkaSystemAdmin(system, bootstrapServers, connectZk = () => 
ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
JaasUtils.isZkSecurityEnabled))
+
+    val map: java.util.Map[String, String] = new java.util.HashMap();
+
+    val KAFKA_CONSUMER_PROPERTY_PREFIX: String = "systems." + system + 
".consumer."
+    val KAFKA_PRODUCER_PROPERTY_PREFIX: String = "systems." + system + 
".consumer."
+
+    map.put(KAFKA_CONSUMER_PROPERTY_PREFIX +
+      
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList)
+    map.put(JobConfig.JOB_NAME, "test.job")
+
+    map.put(KAFKA_CONSUMER_PROPERTY_PREFIX +
+      KafkaConsumerConfig.ZOOKEEPER_CONNECT, zkConnect)
+
+    val config = new MapConfig(map)
+    val consumerConfig = 
KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, system, 
KafkaConsumerConfig.createClientId("kafka-admin-consumer", config))
+
+    new KafkaSystemAdmin(system, new MapConfig(map), 
KafkaSystemConsumer.createKafkaConsumerImpl(system, consumerConfig));
   }
 
 }
\ No newline at end of file

Reply via email to