Repository: samza Updated Branches: refs/heads/master 6666ced0c -> c65802a98
SAMZA-855; Upgrade Samza's Kafka client version to 0.10.0.1. This is based out of PR 15, with merge conflicts resolved and version number of zkClient set to 0.8(compatible with Kafka 0.0.10.1 version). Tested and validated this patch with internal Samza build at LinkedIn. This looks good. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Yi Pan <[email protected]> Closes #33 from shanthoosh/kafka_10_upgrade Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c65802a9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c65802a9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c65802a9 Branch: refs/heads/master Commit: c65802a98ed8ee7c6481352387e42c58682dc067 Parents: 6666ced Author: Shanthoosh Venkataraman <[email protected]> Authored: Fri Dec 23 12:22:55 2016 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Dec 23 12:22:55 2016 -0800 ---------------------------------------------------------------------- build.gradle | 3 + gradle/dependency-versions.gradle | 4 +- .../kafka/KafkaCheckpointManager.scala | 7 +- .../kafka/KafkaCheckpointManagerFactory.scala | 11 +- .../samza/config/RegExTopicGenerator.scala | 6 +- .../samza/system/kafka/KafkaSystemAdmin.scala | 11 +- .../system/kafka/KafkaSystemConsumer.scala | 14 +-- .../samza/system/kafka/KafkaSystemFactory.scala | 9 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 4 +- .../samza/system/kafka/MockKafkaProducer.java | 18 ++- .../java/org/apache/samza/utils/TestUtils.java | 112 ------------------- .../kafka/TestKafkaCheckpointManager.scala | 101 ++++++++--------- .../system/kafka/TestKafkaSystemAdmin.scala | 98 ++++++++-------- .../system/kafka/TestKafkaSystemConsumer.scala | 10 +- .../system/kafka/TestKafkaSystemProducer.scala | 8 +- .../src/main/python/configs/downloads.json | 4 +- samza-test/src/main/python/configs/kafka.json | 22 ++-- .../src/main/python/configs/zookeeper.json | 10 +- .../test/integration/StreamTaskTestUtil.scala | 103 ++++++++--------- 19 files changed, 220 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 0c04931..5b41c52 100644 --- a/build.gradle +++ b/build.gradle @@ -257,6 +257,7 @@ project(":samza-kafka_$scalaVersion") { compile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion" compile "org.apache.kafka:kafka-clients:$kafkaVersion" testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" + testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" @@ -648,6 +649,8 @@ project(":samza-test_$scalaVersion") { testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" testCompile "com.101tec:zkclient:$zkClientVersion" testCompile project(":samza-kafka_$scalaVersion") + testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test" + testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" testRuntime "org.slf4j:slf4j-simple:$slf4jVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 1987745..976a49c 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -25,10 +25,10 @@ junitVersion = "4.8.1" mockitoVersion = "1.8.4" scalaTestVersion = "2.2.4" - zkClientVersion = "0.3" + zkClientVersion = "0.8" zookeeperVersion = "3.3.4" metricsVersion = "2.2.0" - kafkaVersion = "0.8.2.1" + kafkaVersion = "0.10.0.1" commonsHttpClientVersion = "3.1" rocksdbVersion = "3.13.1" yarnVersion = "2.6.1" http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 8f18a92..6461f9d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -27,8 +27,9 @@ import kafka.api._ import kafka.common.{ErrorMapping, InvalidMessageSizeException, TopicAndPartition, UnknownTopicOrPartitionException} import kafka.consumer.SimpleConsumer import kafka.message.InvalidMessageException -import kafka.utils.Utils -import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZkUtils + +import org.apache.kafka.common.utils.Utils import org.apache.kafka.clients.producer.{Producer, ProducerRecord} import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} @@ -56,7 +57,7 @@ class KafkaCheckpointManager( fetchSize: Int, val metadataStore: TopicMetadataStore, connectProducer: () => Producer[Array[Byte], Array[Byte]], - val connectZk: () => ZkClient, + val connectZk: () => ZkUtils, systemStreamPartitionGrouperFactoryString: String, failOnCheckpointValidation: Boolean, val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 4e97376..c42882e 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -21,8 +21,7 @@ package org.apache.samza.checkpoint.kafka import java.util.Properties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory} @@ -40,8 +39,8 @@ object KafkaCheckpointManagerFactory { "compression.type" -> "none") // Set the checkpoint topic configs to have a very small segment size and - // enable log compaction. This keeps job startup time small since there - // are fewer useless (overwritten) messages to read from the checkpoint + // enable log compaction. This keeps job startup time small since there + // are fewer useless (overwritten) messages to read from the checkpoint // topic. def getCheckpointTopicProperties(config: Config) = { val segmentBytes: Int = if (config == null) { @@ -79,7 +78,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val zkConnect = Option(consumerConfig.zkConnect) .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) val connectZk = () => { - new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + ZkUtils(zkConnect, 6000, 6000, false) } val socketTimeout = consumerConfig.socketTimeoutMs @@ -99,4 +98,4 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin config.failOnCheckpointValidation, checkpointTopicProperties = getCheckpointTopicProperties(config)) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala index bcbad27..4e3b247 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala @@ -20,7 +20,7 @@ package org.apache.samza.config import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ ZkUtils, ZKStringSerializer } +import kafka.utils.ZkUtils import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS } import org.apache.samza.SamzaException import org.apache.samza.util.Util @@ -102,10 +102,10 @@ class RegExTopicGenerator extends ConfigRewriter with Logging { val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, "") val zkConnect = Option(consumerConfig.zkConnect) .getOrElse(throw new SamzaException("No zookeeper.connect for system %s defined in config." format systemName)) - val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + val zkClient = new ZkClient(zkConnect, 6000, 6000) try { - ZkUtils.getAllTopics(zkClient) + ZkUtils(zkClient, isZkSecurityEnabled = false).getAllTopics() } finally { zkClient.close() } http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 5927cca..955fa44 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -21,14 +21,15 @@ package org.apache.samza.system.kafka import java.util -import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.system.{ExtendedSystemAdmin, SystemStreamMetadata, SystemStreamPartition} -import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging } +import org.apache.samza.util.{ ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging, KafkaUtil } import kafka.api._ import kafka.consumer.SimpleConsumer import kafka.common.{ TopicExistsException, TopicAndPartition } +import kafka.consumer.ConsumerConfig +import kafka.utils.ZkUtils import java.util.{ Properties, UUID } import scala.collection.JavaConversions import scala.collection.JavaConversions._ @@ -37,6 +38,7 @@ import kafka.consumer.ConsumerConfig import kafka.admin.AdminUtils import org.apache.samza.util.KafkaUtil + object KafkaSystemAdmin extends Logging { /** * A helper method that takes oldest, newest, and upcoming offsets for each @@ -97,10 +99,10 @@ class KafkaSystemAdmin( brokerListString: String, /** - * A method that returns a ZkClient for the Kafka system. This is invoked + * A method that returns a ZkUtils for the Kafka system. This is invoked * when the system admin is attempting to create a coordinator stream. */ - connectZk: () => ZkClient, + connectZk: () => ZkUtils, /** * Custom properties to use when the system admin tries to create a new @@ -183,6 +185,7 @@ class KafkaSystemAdmin( * Returns the offset for the message after the specified offset for each * SystemStreamPartition that was passed in. */ + override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { // This is safe to do with Kafka, even if a topic is key-deduped. If the // offset doesn't exist on a compacted topic, Kafka will return the first http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 36567b6..fa685ee 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -24,7 +24,7 @@ import org.apache.samza.util.Logging import kafka.message.Message import kafka.message.MessageAndOffset import org.apache.samza.Partition -import kafka.utils.Utils +import org.apache.kafka.common.utils.Utils import org.apache.samza.util.Clock import java.util.UUID import kafka.serializer.DefaultDecoder @@ -197,13 +197,13 @@ private[kafka] class KafkaSystemConsumer( // This avoids trying to re-add the same topic partition repeatedly def refresh(tp: List[TopicAndPartition]) = { val head :: rest = tpToRefresh - // refreshBrokers can be called from abdicate and refreshDropped, - // both of which are triggered from BrokerProxy threads. To prevent - // accidentally creating multiple objects for the same broker, or - // accidentally not updating the topicPartitionsAndOffsets variable, - // we need to lock. + // refreshBrokers can be called from abdicate and refreshDropped, + // both of which are triggered from BrokerProxy threads. To prevent + // accidentally creating multiple objects for the same broker, or + // accidentally not updating the topicPartitionsAndOffsets variable, + // we need to lock. this.synchronized { - // Check if we still need this TopicAndPartition inside the + // Check if we still need this TopicAndPartition inside the // critical section. If we don't, then skip it. topicPartitionsAndOffsets.get(head) match { case Some(nextOffset) => http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index b574176..d0e3089 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -20,6 +20,7 @@ package org.apache.samza.system.kafka import java.util.Properties +import kafka.utils.ZkUtils import org.apache.samza.SamzaException import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore} import org.apache.samza.config.Config @@ -29,8 +30,6 @@ import org.apache.samza.config.JobConfig.Config2Job import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ -import org.I0Itec.zkclient.ZkClient -import kafka.utils.ZKStringSerializer import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemConsumer @@ -90,8 +89,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { val getProducer = () => { new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) } val metrics = new KafkaSystemProducerMetrics(systemName, registry) - // Unlike consumer, no need to use encoders here, since they come for free - // inside the producer configs. Kafka's producer will handle all of this + // Unlike consumer, no need to use encoders here, since they come for free + // inside the producer configs. Kafka's producer will handle all of this // for us. new KafkaSystemProducer( @@ -111,7 +110,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { val zkConnect = Option(consumerConfig.zkConnect) .getOrElse(throw new SamzaException("no zookeeper.connect defined in config")) val connectZk = () => { - new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + ZkUtils(zkConnect, 6000, 6000, false) } val coordinatorStreamProperties = getCoordinatorTopicProperties(config) val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index e95f052..0f0bc22 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -22,7 +22,7 @@ package org.apache.samza.util import java.util.Properties import java.util.concurrent.atomic.AtomicLong import kafka.admin.AdminUtils -import org.I0Itec.zkclient.ZkClient +import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{Producer, ProducerRecord} import org.apache.kafka.common.PartitionInfo import org.apache.samza.config.Config @@ -86,7 +86,7 @@ object KafkaUtil extends Logging { } class KafkaUtil(val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - val connectZk: () => ZkClient) extends Logging { + val connectZk: () => ZkUtils) extends Logging { /** * Common code for creating a topic in Kafka * http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java index 6f498de..aaa949d 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java @@ -39,9 +39,9 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Metric; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.samza.utils.TestUtils; import org.apache.kafka.common.MetricName; - +import org.apache.kafka.common.record.Record; +import org.apache.kafka.test.TestUtils; public class MockKafkaProducer implements Producer<byte[], byte[]> { @@ -113,7 +113,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { } private RecordMetadata getRecordMetadata(ProducerRecord record) { - return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get()); + return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1); } @Override @@ -174,6 +174,16 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { } + @Override + public void close(long timeout, TimeUnit timeUnit) { + + } + + public synchronized void flush () { + + } + + private static class FutureFailure implements Future<RecordMetadata> { private final ExecutionException exception; @@ -215,7 +225,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { public FutureSuccess(ProducerRecord record, int offset) { this.record = record; - this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset); + this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java b/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java deleted file mode 100644 index 2fa743f..0000000 --- a/samza-kafka/src/test/java/org/apache/samza/utils/TestUtils.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samza.utils; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; - -import static java.util.Arrays.asList; - - -/** - * Copied from :kafka-clients API as a workaround until KAFKA-1861 is resolved - * Helper functions for writing unit tests - */ -public class TestUtils { - - public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); - - public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - public static String DIGITS = "0123456789"; - public static String LETTERS_AND_DIGITS = LETTERS + DIGITS; - - /* A consistent random number generator to make tests repeatable */ - public static final Random seededRandom = new Random(192348092834L); - public static final Random random = new Random(); - - public static Cluster singletonCluster(String topic, int partitions) { - return clusterWith(1, topic, partitions); - } - - public static Cluster clusterWith(int nodes, String topic, int partitions) { - Node[] ns = new Node[nodes]; - for (int i = 0; i < nodes; i++) - ns[i] = new Node(0, "localhost", 1969); - List<PartitionInfo> parts = new ArrayList<PartitionInfo>(); - for (int i = 0; i < partitions; i++) - parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); - return new Cluster(asList(ns), parts); - } - - /** - * Choose a number of random available ports - */ - public static int[] choosePorts(int count) { - try { - ServerSocket[] sockets = new ServerSocket[count]; - int[] ports = new int[count]; - for (int i = 0; i < count; i++) { - sockets[i] = new ServerSocket(0); - ports[i] = sockets[i].getLocalPort(); - } - for (int i = 0; i < count; i++) - sockets[i].close(); - return ports; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Choose an available port - */ - public static int choosePort() { - return choosePorts(1)[0]; - } - - /** - * Generate an array of random bytes - * - * @param size The size of the array - */ - public static byte[] randomBytes(int size) { - byte[] bytes = new byte[size]; - seededRandom.nextBytes(bytes); - return bytes; - } - - /** - * Generate a random string of letters and digits of the given length - * - * @param len The length of the string - * @return The random string - */ - public static String randomString(int len) { - StringBuilder b = new StringBuilder(); - for (int i = 0; i < len; i++) - b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length()))); - return b.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index e6815da..1f2f62f 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -22,10 +22,11 @@ package org.apache.samza.checkpoint.kafka import kafka.admin.AdminUtils import kafka.common.{InvalidMessageSizeException, UnknownTopicOrPartitionException} import kafka.message.InvalidMessageException -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer} -import kafka.zk.EmbeddedZookeeper -import org.I0Itec.zkclient.ZkClient +import kafka.server.{KafkaConfig, KafkaServer, ConfigType} +import kafka.utils.{CoreUtils, TestUtils, ZkUtils} +import kafka.integration.KafkaServerTestHarness + +import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.config.{JobConfig, KafkaProducerConfig, MapConfig} @@ -41,71 +42,59 @@ import org.junit._ import scala.collection.JavaConversions._ import scala.collection._ -class TestKafkaCheckpointManager { +class TestKafkaCheckpointManager extends KafkaServerTestHarness { + + protected def numBrokers: Int = 3 + + def generateConfigs() = { + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true) + props.map(KafkaConfig.fromProps) + } val checkpointTopic = "checkpoint-topic" val serdeCheckpointTopic = "checkpoint-topic-invalid-serde" val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null) - val zkConnect: String = TestZKUtils.zookeeperConnect - var zkClient: ZkClient = null - val zkConnectionTimeout = 6000 - val zkSessionTimeout = 6000 - - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val ports = TestUtils.choosePorts(3) - val (port1, port2, port3) = (ports(0), ports(1), ports(2)) - - val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - props1.put("controlled.shutdown.enable", "true") - val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - props1.put("controlled.shutdown.enable", "true") - val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - props1.put("controlled.shutdown.enable", "true") - - val config = new java.util.HashMap[String, Object]() - val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) - config.put("acks", "all") - config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") - config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString) - config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) - val producerConfig = new KafkaProducerConfig("kafka", "i001", config) + + val zkSecure = JaasUtils.isZkSecurityEnabled() + val partition = new Partition(0) val partition2 = new Partition(1) val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123")) val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345")) - var zookeeper: EmbeddedZookeeper = null - var server1: KafkaServer = null - var server2: KafkaServer = null - var server3: KafkaServer = null + + var producerConfig: KafkaProducerConfig = null + var metadataStore: TopicMetadataStore = null var failOnTopicValidation = true val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName @Before - def beforeSetupServers { - zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(new KafkaConfig(props1)) - server2 = TestUtils.createServer(new KafkaConfig(props2)) - server3 = TestUtils.createServer(new KafkaConfig(props3)) + override def setUp { + super.setUp + + TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update") + + val config = new java.util.HashMap[String, Object]() + val brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") + + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) + config.put("acks", "all") + config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString) + config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) + producerConfig = new KafkaProducerConfig("kafka", "i001", config) + metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") } @After - def afterCleanLogDirs { - server1.shutdown - server1.awaitShutdown() - server2.shutdown - server2.awaitShutdown() - server3.shutdown - server3.awaitShutdown() - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) - zookeeper.shutdown + override def tearDown() { + if (servers != null) { + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + } + super.tearDown } private def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint, cpTopic: String = checkpointTopic) = { @@ -127,7 +116,7 @@ class TestKafkaCheckpointManager { private def createCheckpointTopic(cpTopic: String = checkpointTopic, partNum: Int = 1) = { - val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure) try { AdminUtils.createTopic( zkClient, @@ -151,8 +140,8 @@ class TestKafkaCheckpointManager { kcm.kafkaUtil.validateTopicPartitionCount(checkpointTopic, "kafka", metadataStore, 1) // check that log compaction is enabled. - val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) - val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic) + val zkClient = ZkUtils(zkConnect, 6000, 6000, zkSecure) + val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic) zkClient.close assertEquals("compact", topicConfig.get("cleanup.policy")) assertEquals("26214400", topicConfig.get("segment.bytes")) @@ -242,7 +231,7 @@ class TestKafkaCheckpointManager { fetchSize = 300 * 1024, metadataStore = metadataStore, connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), - connectZk = () => new ZkClient(zkConnect, 60000, 60000, ZKStringSerializer), + connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]()))) @@ -261,7 +250,7 @@ class TestKafkaCheckpointManager { fetchSize = 300 * 1024, metadataStore = metadataStore, connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties), - connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), + connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, failOnCheckpointValidation = failOnTopicValidation, serde = new InvalideSerde(exception), http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index f00405d..0e3c9b5 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -28,10 +28,11 @@ import kafka.admin.AdminUtils import kafka.common.{ErrorMapping, LeaderNotAvailableException} import kafka.consumer.{Consumer, ConsumerConfig} import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer} -import kafka.zk.EmbeddedZookeeper -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import kafka.utils.{TestUtils, ZkUtils} +import kafka.integration.KafkaServerTestHarness +import org.apache.kafka.common.security.JaasUtils + import org.apache.samza.Partition import org.apache.samza.config.KafkaProducerConfig import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata @@ -42,54 +43,56 @@ import org.junit._ import scala.collection.JavaConversions._ -object TestKafkaSystemAdmin { + +object TestKafkaSystemAdmin extends KafkaServerTestHarness { + val SYSTEM = "kafka" val TOPIC = "input" val TOPIC2 = "input2" val TOTAL_PARTITIONS = 50 val REPLICATION_FACTOR = 2 + val zkSecure = JaasUtils.isZkSecurityEnabled() + + protected def numBrokers: Int = 3 - val zkConnect: String = TestZKUtils.zookeeperConnect - var zkClient: ZkClient = null - val zkConnectionTimeout = 6000 - val zkSessionTimeout = 6000 - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val ports = TestUtils.choosePorts(3) - val (port1, port2, port3) = (ports(0), ports(1), ports(2)) - - val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - - val config = new util.HashMap[String, Object]() - val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) - config.put("bootstrap.servers", brokers) - config.put("acks", "all") - config.put("serializer.class", "kafka.serializer.StringEncoder") - val producerConfig = new KafkaProducerConfig("kafka", "i001", config) var producer: KafkaProducer[Array[Byte], Array[Byte]] = null - var zookeeper: EmbeddedZookeeper = null - var server1: KafkaServer = null - var server2: KafkaServer = null - var server3: KafkaServer = null var metadataStore: TopicMetadataStore = null + var producerConfig: KafkaProducerConfig = null + var brokers: String = null + + def generateConfigs() = { + val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true) + props.map(KafkaConfig.fromProps) + } @BeforeClass - def beforeSetupServers { - zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(new KafkaConfig(props1)) - server2 = TestUtils.createServer(new KafkaConfig(props2)) - server3 = TestUtils.createServer(new KafkaConfig(props3)) - zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer) + override def setUp { + super.setUp + + val config = new java.util.HashMap[String, Object]() + + brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") + + config.put("bootstrap.servers", brokers) + config.put("acks", "all") + config.put("serializer.class", "kafka.serializer.StringEncoder") + + producerConfig = new KafkaProducerConfig("kafka", "i001", config) + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") } + + @AfterClass + override def tearDown { + super.tearDown + } + + def createTopic(topicName: String, partitionCount: Int) { AdminUtils.createTopic( - zkClient, + zkUtils, topicName, partitionCount, REPLICATION_FACTOR) @@ -133,21 +136,6 @@ object TestKafkaSystemAdmin { Consumer.create(consumerConfig) } - @AfterClass - def afterCleanLogDirs { - producer.close() - server1.shutdown - server1.awaitShutdown() - server2.shutdown - server2.awaitShutdown() - server3.shutdown - server3.awaitShutdown() - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) - zkClient.close - zookeeper.shutdown - } } /** @@ -158,7 +146,7 @@ class TestKafkaSystemAdmin { import TestKafkaSystemAdmin._ // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated - val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) @Test def testShouldAssembleMetadata { @@ -219,7 +207,7 @@ class TestKafkaSystemAdmin { // Empty Kafka topics should have a next offset of 0. assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset) - // Add a new message to one of the partitions, and verify that it works as + // Add a new message to one of the partitions, and verify that it works as // expected. producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get() metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC)) @@ -285,7 +273,8 @@ class TestKafkaSystemAdmin { @Test def testShouldCreateCoordinatorStream { val topic = "test-coordinator-stream" - val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3) + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) + systemAdmin.createCoordinatorStream(topic) validateTopic(topic, 1) val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) @@ -296,9 +285,10 @@ class TestKafkaSystemAdmin { assertEquals(3, partitionMetadata.replicas.size) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) { + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) { import kafka.api.TopicMetadata var metadataCallCount = 0 + // Simulate Kafka telling us that the leader for the topic is not available override def getTopicMetadata(topics: Set[String]) = { metadataCallCount += 1 http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 3b3ed3d..8a5cbc2 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -67,7 +67,7 @@ class TestKafkaSystemConsumer { val metrics = new KafkaSystemConsumerMetrics // Lie and tell the store that the partition metadata is empty. We can't // use partition metadata because it has Broker in its constructor, which - // is package private to Kafka. + // is package private to Kafka. val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0))) var hosts = List[String]() var getHostPortCount = 0 @@ -81,7 +81,7 @@ class TestKafkaSystemConsumer { override def createBrokerProxy(host: String, port: Int): BrokerProxy = { new BrokerProxy(host, port, systemName, "", metrics, sink) { override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { - // Skip this since we normally do verification of offsets, which + // Skip this since we normally do verification of offsets, which // tries to connect to Kafka. Rather than mock that, just forget it. nextOffsets.size } @@ -159,13 +159,12 @@ class TestKafkaSystemConsumer { val msg = Array[Byte](5, 112, 9, 126) val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654) - // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead + // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354) - assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) + assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) } - @Test def testFetchThresholdBytesDisabled { val metadataStore = new MockMetadataStore @@ -190,4 +189,3 @@ class TestKafkaSystemConsumer { class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata } - http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala index e4250e7..7331611 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala @@ -27,6 +27,7 @@ import java.util import org.junit.Assert._ import org.scalatest.Assertions.intercept import org.apache.kafka.common.errors.{TimeoutException, RecordTooLargeException} +import org.apache.kafka.test.MockSerializer import org.apache.samza.SamzaException @@ -37,13 +38,14 @@ class TestKafkaSystemProducer { @Test def testKafkaProducer { + val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer) val systemProducer = new KafkaSystemProducer(systemName = "test", - getProducer = () => { new MockProducer(true) }, - metrics = new KafkaSystemProducerMetrics) + getProducer = () => mockProducer, + metrics = new KafkaSystemProducerMetrics) systemProducer.register("test") systemProducer.start systemProducer.send("test", someMessage) - assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer].history().size()) + assertEquals(1, systemProducer.producer.asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size()) systemProducer.stop } http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/downloads.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json index 22694c0..c468156 100644 --- a/samza-test/src/main/python/configs/downloads.json +++ b/samza-test/src/main/python/configs/downloads.json @@ -1,5 +1,5 @@ { - "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz", - "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz", + "url_kafka": "http://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.10-0.10.0.1.tgz", + "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz", "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz" } http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/kafka.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json index ab2f346..ece2a34 100644 --- a/samza-test/src/main/python/configs/kafka.json +++ b/samza-test/src/main/python/configs/kafka.json @@ -3,21 +3,21 @@ "kafka_instance_0": "localhost" }, "kafka_port": 9092, - "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties", - "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh", + "kafka_start_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.0.1/config/server.properties", + "kafka_stop_cmd": "kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh", "kafka_install_path": "deploy/kafka", - "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz", + "kafka_executable": "kafka_2.10-0.10.0.1.tgz", "kafka_post_install_cmds": [ - "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh", - "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties", - "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties" + "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.10-0.10.0.1/bin/kafka-server-stop.sh", + "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.10-0.10.0.1/config/server.properties", + "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.10-0.10.0.1/config/server.properties" ], "kafka_logs": [ "log-cleaner.log", - "kafka_2.9.2-0.8.2.0/logs/controller.log", - "kafka_2.9.2-0.8.2.0/logs/kafka-request.log", - "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log", - "kafka_2.9.2-0.8.2.0/logs/server.log", - "kafka_2.9.2-0.8.2.0/logs/state-change.log" + "kafka_2.10-0.10.0.1/logs/controller.log", + "kafka_2.10-0.10.0.1/logs/kafka-request.log", + "kafka_2.10-0.10.0.1/logs/kafkaServer-gc.log", + "kafka_2.10-0.10.0.1/logs/server.log", + "kafka_2.10-0.10.0.1/logs/state-change.log" ] } http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/main/python/configs/zookeeper.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/zookeeper.json b/samza-test/src/main/python/configs/zookeeper.json index 2762197..250bfce 100644 --- a/samza-test/src/main/python/configs/zookeeper.json +++ b/samza-test/src/main/python/configs/zookeeper.json @@ -2,13 +2,13 @@ "zookeeper_hosts": { "zookeeper_instance_0": "localhost" }, - "zookeeper_start_cmd": "zookeeper-3.4.3/bin/zkServer.sh start", - "zookeeper_stop_cmd": "zookeeper-3.4.3/bin/zkServer.sh stop", + "zookeeper_start_cmd": "zookeeper-3.4.6/bin/zkServer.sh start", + "zookeeper_stop_cmd": "zookeeper-3.4.6/bin/zkServer.sh stop", "zookeeper_install_path": "deploy/zookeeper", - "zookeeper_executable": "zookeeper-3.4.3.tar.gz", + "zookeeper_executable": "zookeeper-3.4.6.tar.gz", "zookeeper_post_install_cmds": [ - "cp zookeeper-3.4.3/conf/zoo_sample.cfg zookeeper-3.4.3/conf/zoo.cfg", - "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.3/conf/zoo.cfg" + "cp zookeeper-3.4.6/conf/zoo_sample.cfg zookeeper-3.4.6/conf/zoo.cfg", + "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.6/conf/zoo.cfg" ], "zookeeper_logs": [ "zookeeper.out" http://git-wip-us.apache.org/repos/asf/samza/blob/c65802a9/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 8d7e3fe..5d82b92 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -22,17 +22,19 @@ package org.apache.samza.test.integration import java.util import java.util.Properties import java.util.concurrent.{CountDownLatch, TimeUnit} +import javax.security.auth.login.Configuration import kafka.admin.AdminUtils import kafka.consumer.{Consumer, ConsumerConfig} import kafka.message.MessageAndMetadata import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.{TestUtils, TestZKUtils, Utils, ZKStringSerializer} +import kafka.utils.{TestUtils, CoreUtils, ZkUtils} import kafka.zk.EmbeddedZookeeper -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord} import org.apache.samza.Partition import org.apache.samza.checkpoint.Checkpoint +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.JaasUtils import org.apache.samza.config.{Config, KafkaProducerConfig, MapConfig} import org.apache.samza.container.TaskName import org.apache.samza.job.local.ThreadJobFactory @@ -44,7 +46,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMeta import org.junit.Assert._ import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap} +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap} /* * This creates an singleton instance of TestBaseStreamTask and implement the helper functions to @@ -58,39 +60,19 @@ object StreamTaskTestUtil { val TOTAL_TASK_NAMES = 1 val REPLICATION_FACTOR = 3 - val zkConnect: String = TestZKUtils.zookeeperConnect - var zkClient: ZkClient = null val zkConnectionTimeout = 6000 val zkSessionTimeout = 6000 - val brokerId1 = 0 - val brokerId2 = 1 - val brokerId3 = 2 - val ports = TestUtils.choosePorts(3) - val (port1, port2, port3) = (ports(0), ports(1), ports(2)) - - val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - props1.setProperty("auto.create.topics.enable","false") - props2.setProperty("auto.create.topics.enable","false") - props3.setProperty("auto.create.topics.enable","false") - - val config = new util.HashMap[String, Object]() - val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) - config.put("bootstrap.servers", brokers) - config.put("request.required.acks", "-1") - config.put("serializer.class", "kafka.serializer.StringEncoder") - config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") - config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString()) - val producerConfig = new KafkaProducerConfig("kafka", "i001", config) + var zkUtils: ZkUtils = null + var zookeeper: EmbeddedZookeeper = null + var brokers: String = null + def zkPort: Int = zookeeper.port + def zkConnect: String = s"127.0.0.1:$zkPort" + var producer: Producer[Array[Byte], Array[Byte]] = null val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123")) val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345")) - var zookeeper: EmbeddedZookeeper = null - var server1: KafkaServer = null - var server2: KafkaServer = null - var server3: KafkaServer = null + var metadataStore: TopicMetadataStore = null /* @@ -107,8 +89,8 @@ object StreamTaskTestUtil { "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic "systems.kafka.samza.msg.serde" -> "string", - "systems.kafka.consumer.zookeeper.connect" -> zkConnect, - "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1), + "systems.kafka.consumer.zookeeper.connect" -> "localhost:2181", + "systems.kafka.producer.bootstrap.servers" -> "localhost:9092", // Since using state, need a checkpoint manager "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory", "task.checkpoint.system" -> "kafka", @@ -122,12 +104,36 @@ object StreamTaskTestUtil { TestTask.reset() } + var servers: Buffer[KafkaServer] = null + def beforeSetupServers { - zookeeper = new EmbeddedZookeeper(zkConnect) - server1 = TestUtils.createServer(new KafkaConfig(props1)) - server2 = TestUtils.createServer(new KafkaConfig(props2)) - server3 = TestUtils.createServer(new KafkaConfig(props3)) - zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer) + zookeeper = new EmbeddedZookeeper() + zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, JaasUtils.isZkSecurityEnabled()) + + val props = TestUtils.createBrokerConfigs(3, zkConnect, true) + + val configs = props.map(p => { + p.setProperty("auto.create.topics.enable","false") + KafkaConfig.fromProps(p) + }) + + servers = configs.map(TestUtils.createServer(_)).toBuffer + + val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT) + brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") + + jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect, + "systems.kafka.producer.bootstrap.servers" -> brokers) + + val config = new util.HashMap[String, Object]() + + config.put("bootstrap.servers", brokers) + config.put("request.required.acks", "-1") + config.put("serializer.class", "kafka.serializer.StringEncoder") + config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") + config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString()) + val producerConfig = new KafkaProducerConfig("kafka", "i001", config) + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") @@ -137,7 +143,7 @@ object StreamTaskTestUtil { def createTopics { AdminUtils.createTopic( - zkClient, + zkUtils, INPUT_TOPIC, TOTAL_TASK_NAMES, REPLICATION_FACTOR) @@ -174,18 +180,15 @@ object StreamTaskTestUtil { } def afterCleanLogDirs { - producer.close() - server1.shutdown - server1.awaitShutdown() - server2.shutdown - server2.awaitShutdown() - server3.shutdown - server3.awaitShutdown() - Utils.rm(server1.config.logDirs) - Utils.rm(server2.config.logDirs) - Utils.rm(server3.config.logDirs) - zkClient.close - zookeeper.shutdown + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + + if (zkUtils != null) + CoreUtils.swallow(zkUtils.close()) + if (zookeeper != null) + CoreUtils.swallow(zookeeper.shutdown()) + Configuration.setConfiguration(null) + } }
