Repository: samza Updated Branches: refs/heads/master 07f28e948 -> a6540b4e3
SAMZA-1530; Bump up Kafka dependency to 0.11 Author: Dong Lin <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #395 from lindong28/SAMZA-1530 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6540b4e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6540b4e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6540b4e Branch: refs/heads/master Commit: a6540b4e3d1d3916210c50be9b4b2b4920f885bb Parents: 07f28e9 Author: Dong Lin <[email protected]> Authored: Wed Jan 10 10:52:38 2018 -0800 Committer: xiliu <[email protected]> Committed: Wed Jan 10 10:52:38 2018 -0800 ---------------------------------------------------------------------- bin/check-all.sh | 2 +- gradle/dependency-versions.gradle | 2 +- .../apache/samza/system/kafka/BrokerProxy.scala | 13 ++++---- .../apache/samza/system/kafka/GetOffset.scala | 4 +-- .../samza/system/kafka/KafkaSystemAdmin.scala | 8 ++--- .../samza/system/kafka/TopicMetadataCache.scala | 2 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 8 +++-- .../samza/system/kafka/MockKafkaProducer.java | 25 +++++++++++++-- .../kafka/TestKafkaCheckpointManager.scala | 7 ++--- .../samza/system/kafka/TestBrokerProxy.scala | 9 +++--- .../system/kafka/TestKafkaSystemAdmin.scala | 26 ++++++---------- .../system/kafka/TestKafkaSystemConsumer.scala | 4 +-- .../system/kafka/TestTopicMetadataCache.scala | 32 +++++++++++--------- .../org/apache/samza/utils/TestKafkaUtil.scala | 7 +++-- .../processor/TestZkLocalApplicationRunner.java | 2 +- .../test/integration/StreamTaskTestUtil.scala | 7 ++--- 16 files changed, 86 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/bin/check-all.sh ---------------------------------------------------------------------- diff --git a/bin/check-all.sh b/bin/check-all.sh index 2f9f03c..f168bc8 100755 --- a/bin/check-all.sh +++ b/bin/check-all.sh @@ -21,7 +21,7 @@ set -e -SCALAs=( "2.10" "2.11" "2.12" ) +SCALAs=( "2.11" "2.12" ) JDKs=( "JAVA8_HOME" ) YARNs=( "2.6.1" "2.7.1" ) http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 20a1d56..2e45914 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -33,7 +33,7 @@ jodaTimeVersion = "2.2" joptSimpleVersion = "3.2" junitVersion = "4.8.1" - kafkaVersion = "0.10.1.1" + kafkaVersion = "0.11.0.2" log4jVersion = "1.2.17" metricsVersion = "2.2.0" mockitoVersion = "1.10.19" http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 5338886..8a6618d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -182,7 +182,7 @@ class BrokerProxy( firstCallBarrier.countDown() // Split response into errors and non errors, processing the errors first - val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError) + val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError) handleErrors(errorResponses, response) @@ -219,18 +219,17 @@ class BrokerProxy( immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) } - def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = { + def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = { // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves - case class Error(tp: TopicAndPartition, code: Short, exception: Throwable) + case class Error(tp: TopicAndPartition, code: Short, exception: Exception) // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset) // Convert FetchResponse into easier-to-work-with Errors val errors = for ( (topicAndPartition, responseData) <- errorResponses; - errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values... - exception <- Option(ErrorMapping.exceptionFor(errorCode)) - ) yield new Error(topicAndPartition, errorCode, exception) + error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values... + ) yield new Error(topicAndPartition, error.code(), error.exception()) val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode } val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode) @@ -241,7 +240,7 @@ class BrokerProxy( // handle the recoverable errors. remainingErrors.foreach(e => { warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) - KafkaUtil.maybeThrowException(e.code) }) + KafkaUtil.maybeThrowException(e.exception) }) notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp)) http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 5528702..040e246 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -60,7 +60,7 @@ class GetOffset( val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) if (messages.hasError) { - KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition)) + KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception()) } info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition)) @@ -86,7 +86,7 @@ class GetOffset( .get(topicAndPartition) .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition)) - KafkaUtil.maybeThrowException(partitionOffsetResponse.error) + KafkaUtil.maybeThrowException(partitionOffsetResponse.error.exception()) partitionOffsetResponse .offsets http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 013b292..4715141 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -164,7 +164,7 @@ class KafkaSystemAdmin( metadataTTL) val result = metadata.map { case (topic, topicMetadata) => { - KafkaUtil.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) val partitionsMap = topicMetadata.partitionsMetadata.map { pm => new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "") @@ -350,7 +350,7 @@ class KafkaSystemAdmin( .values // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] .flatMap(topicMetadata => { - KafkaUtil.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) topicMetadata .partitionsMetadata // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] @@ -390,7 +390,7 @@ class KafkaSystemAdmin( .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo)) .partitionErrorAndOffsets .mapValues(partitionErrorAndOffset => { - KafkaUtil.maybeThrowException(partitionErrorAndOffset.error) + KafkaUtil.maybeThrowException(partitionErrorAndOffset.error.exception()) partitionErrorAndOffset.offsets.head }) @@ -480,7 +480,7 @@ class KafkaSystemAdmin( val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo, metadataTTL) val topicMetadata = topicMetadataMap(topicName) - KafkaUtil.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) val partitionCount = topicMetadata.partitionsMetadata.length if (partitionCount != spec.getPartitionCount) { http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala index 82ecf1a..8a3ab2b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala @@ -73,6 +73,6 @@ object TopicMetadataCache extends Logging { * partition's metadata has a bad errorCode. */ def hasBadErrorCode(streamMetadata: TopicMetadata) = { - KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode)) + KafkaUtil.isBadErrorCode(streamMetadata.error.code()) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.error.code())) } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 1410cbb..5b0137a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -29,7 +29,8 @@ import org.apache.samza.config.{ApplicationConfig, Config, ConfigException} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.execution.StreamManager import org.apache.samza.system.OutgoingMessageEnvelope -import kafka.common.{ErrorMapping, ReplicaNotAvailableException} +import org.apache.kafka.common.errors.ReplicaNotAvailableException +import kafka.common.ErrorMapping import org.apache.kafka.common.errors.TopicExistsException import org.apache.samza.system.kafka.TopicMetadataCache @@ -71,9 +72,10 @@ object KafkaUtil extends Logging { * <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol * docs</a>, ReplicaNotAvailableException can be safely ignored. */ - def maybeThrowException(code: Short) { + def maybeThrowException(e: Exception) { try { - ErrorMapping.maybeThrowException(code) + if (e != null) + throw e } catch { case e: ReplicaNotAvailableException => debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.") http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java index 024c6e6..e66b7c3 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/MockKafkaProducer.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import kafka.producer.ProducerClosedException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -42,6 +43,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.test.TestUtils; public class MockKafkaProducer implements Producer<byte[], byte[]> { @@ -98,7 +100,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { } private RecordMetadata getRecordMetadata(ProducerRecord record) { - return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), Record.NO_TIMESTAMP, -1, -1, -1); + return new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, this.msgsSent.get(), -1L, -1, -1, -1); } @Override @@ -190,6 +192,25 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { new FlushRunnable(0).run(); } + public void initTransactions() { + + } + + public void abortTransaction() { + + } + + public void beginTransaction() { + + } + + public void commitTransaction() { + + } + + public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) { + + } private static class FutureFailure implements Future<RecordMetadata> { @@ -232,7 +253,7 @@ public class MockKafkaProducer implements Producer<byte[], byte[]> { public FutureSuccess(ProducerRecord record, int offset) { this.record = record; - this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, Record.NO_TIMESTAMP, -1, -1, -1); + this._metadata = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 0 : record.partition()), 0, offset, RecordBatch.NO_TIMESTAMP, -1, -1, -1); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index ec9f3a0..86cb418 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -52,15 +52,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val checkpoint1 = new Checkpoint(ImmutableMap.of(ssp, "offset-1")) val checkpoint2 = new Checkpoint(ImmutableMap.of(ssp, "offset-2")) val taskName = new TaskName("Partition 0") - - var brokers: String = null var config: Config = null @Before override def setUp { super.setUp TestUtils.waitUntilTrue(() => servers.head.metadataCache.getAliveBrokers.size == numBrokers, "Wait for cache to update") - brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") config = getConfig() } @@ -140,7 +137,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val defaultSerializer = classOf[ByteArraySerializer].getCanonicalName val props = new Properties() props.putAll(ImmutableMap.of( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers, + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, defaultSerializer, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, defaultSerializer)) props @@ -151,7 +148,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { .put(JobConfig.JOB_NAME, "some-job-name") .put(JobConfig.JOB_ID, "i001") .put(s"systems.$checkpointSystemName.samza.factory", classOf[KafkaSystemFactory].getCanonicalName) - .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokers) + .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", brokerList) .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", zkConnect) .put("task.checkpoint.system", checkpointSystemName) .build()) http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index f0bdafd..d510076 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -24,9 +24,10 @@ import java.nio.ByteBuffer import java.util.concurrent.CountDownLatch import kafka.api.{PartitionOffsetsResponse, _} -import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet} +import org.apache.kafka.common.protocol.Errors import org.apache.samza.SamzaException import org.apache.samza.util.Logging import org.junit.Assert._ @@ -165,7 +166,7 @@ class TestBrokerProxy extends Logging { messageSet } - val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet) + val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet) val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData) when(fetchResponse.data).thenReturn(map.toSeq) @@ -257,12 +258,12 @@ class TestBrokerProxy extends Logging { } val mfr = mock(classOf[FetchResponse]) when(mfr.hasError).thenReturn(true) - when(mfr.errorCode("topic", 42)).thenReturn(ErrorMapping.OffsetOutOfRangeCode) + when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE) val messageSet = mock(classOf[MessageSet]) when(messageSet.iterator).thenReturn(Iterator.empty) val response = mock(classOf[FetchResponsePartitionData]) - when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode) + when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE) val responseMap = Map(tp -> response) when(mfr.data).thenReturn(responseMap.toSeq) invocationCount += 1 http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 65c43f5..2039447 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -24,7 +24,8 @@ package org.apache.samza.system.kafka import java.util.{Properties, UUID} import kafka.admin.AdminUtils -import kafka.common.{ErrorMapping, LeaderNotAvailableException} +import org.apache.kafka.common.errors.LeaderNotAvailableException +import org.apache.kafka.common.protocol.Errors import kafka.consumer.{Consumer, ConsumerConfig} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig @@ -68,19 +69,13 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { @BeforeClass override def setUp { super.setUp - val config = new java.util.HashMap[String, String]() - - brokers = brokerList.split(",").map(p => "localhost" + p).mkString(",") - - config.put("bootstrap.servers", brokers) + config.put("bootstrap.servers", brokerList) config.put("acks", "all") config.put("serializer.class", "kafka.serializer.StringEncoder") - producerConfig = new KafkaProducerConfig("kafka", "i001", config) - producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties) - metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name") + metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name") } @@ -107,9 +102,8 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { try { val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo) val topicMetadata = topicMetadataMap(topic) - val errorCode = topicMetadata.errorCode - KafkaUtil.maybeThrowException(errorCode) + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) done = expectedPartitionCount == topicMetadata.partitionsMetadata.size } catch { @@ -137,11 +131,11 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { } def createSystemAdmin: KafkaSystemAdmin = { - new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) + new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) } def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = { - new KafkaSystemAdmin(SYSTEM, brokers, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map()) + new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties, coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map()) } } @@ -281,7 +275,7 @@ class TestKafkaSystemAdmin { @Test def testShouldCreateCoordinatorStream { val topic = "test-coordinator-stream" - val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) + val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3) val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") systemAdmin.createStream(spec) @@ -294,14 +288,14 @@ class TestKafkaSystemAdmin { assertEquals(3, partitionMetadata.replicas.size) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokers, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) { + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) { import kafka.api.TopicMetadata var metadataCallCount = 0 // Simulate Kafka telling us that the leader for the topic is not available override def getTopicMetadata(topics: Set[String]) = { metadataCallCount += 1 - val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode) + val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE) Map("quux" -> topicMetadata) } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala index 8a5cbc2..4dd170f 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -25,7 +25,7 @@ import kafka.cluster.Broker import kafka.common.TopicAndPartition import kafka.message.Message import kafka.message.MessageAndOffset - +import org.apache.kafka.common.protocol.Errors import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition import org.apache.samza.Partition @@ -68,7 +68,7 @@ class TestKafkaSystemConsumer { // Lie and tell the store that the partition metadata is empty. We can't // use partition metadata because it has Broker in its constructor, which // is package private to Kafka. - val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0))) + val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE))) var hosts = List[String]() var getHostPortCount = 0 val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) { http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala index 50c13ab..9cc2f63 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala @@ -20,7 +20,8 @@ package org.apache.samza.system.kafka import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + import kafka.api.TopicMetadata import org.I0Itec.zkclient.ZkClient import org.apache.samza.util.Clock @@ -30,6 +31,7 @@ import org.junit.Before import org.junit.Test import kafka.common.ErrorMapping import kafka.api.PartitionMetadata +import org.apache.kafka.common.protocol.Errors class TestTopicMetadataCache { @@ -41,8 +43,8 @@ class TestTopicMetadataCache { class MockTopicMetadataStore extends TopicMetadataStore { var mockCache = Map( - "topic1" -> new TopicMetadata("topic1", List.empty, 0), - "topic2" -> new TopicMetadata("topic2", List.empty, 0)) + "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE), + "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE)) var numberOfCalls: AtomicInteger = new AtomicInteger(0) def getTopicInfo(topics: Set[String]) = { @@ -53,7 +55,7 @@ class TestTopicMetadataCache { } def setErrorCode(topic: String, errorCode: Short) { - mockCache += topic -> new TopicMetadata(topic, List.empty, errorCode) + mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode)) } } @@ -70,7 +72,7 @@ class TestTopicMetadataCache { mockStore.setErrorCode("topic1", 3) var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) assertEquals("topic1", metadata("topic1").topic) - assertEquals(3, metadata("topic1").errorCode) + assertEquals(3, metadata("topic1").error.code) assertEquals(1, mockStore.numberOfCalls.get()) // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache @@ -78,21 +80,21 @@ class TestTopicMetadataCache { mockStore.setErrorCode("topic1", 0) metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) assertEquals("topic1", metadata("topic1").topic) - assertEquals(0, metadata("topic1").errorCode) + assertEquals(0, metadata("topic1").error.code) assertEquals(2, mockStore.numberOfCalls.get()) // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not // called metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) assertEquals("topic1", metadata("topic1").topic) - assertEquals(0, metadata("topic1").errorCode) + assertEquals(0, metadata("topic1").error.code) assertEquals(2, mockStore.numberOfCalls.get()) // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called mockTime.currentValue = 11 metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) assertEquals("topic1", metadata("topic1").topic) - assertEquals(0, metadata("topic1").errorCode) + assertEquals(0, metadata("topic1").error.code) assertEquals(3, mockStore.numberOfCalls.get()) } @@ -113,7 +115,7 @@ class TestTopicMetadataCache { waitForThreadStart.await() val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis) numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1")) - numAssertionSuccess.compareAndSet(true, metadata("topic1").errorCode == 0) + numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0) } }) threads(i).start() @@ -127,11 +129,11 @@ class TestTopicMetadataCache { @Test def testBadErrorCodes { - val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode) - val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError) - assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode))) - assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError))) - assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError))) - assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError))) + val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE) + val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT))) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE))) } } http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala index 848cfc8..3548aea 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala @@ -20,14 +20,15 @@ package org.apache.samza.utils import org.junit.Test -import org.scalatest.{ Matchers => ScalaTestMatchers } +import org.scalatest.{Matchers => ScalaTestMatchers} import org.apache.samza.util.KafkaUtil import kafka.common.ErrorMapping +import org.apache.kafka.common.protocol.Errors class TestKafkaUtil extends ScalaTestMatchers { @Test def testMaybeThrowException { - intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) } - KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode) + intercept[Exception] { KafkaUtil.maybeThrowException(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) } + KafkaUtil.maybeThrowException(Errors.REPLICA_NOT_AVAILABLE.exception()) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 9c5dad5..97fe1f8 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -541,7 +541,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Set up kafka topics. publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); - MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG), "1000")); + MapConfig kafkaProducerConfig = new MapConfig(ImmutableMap.of(String.format("systems.%s.producer.%s", TEST_SYSTEM, ProducerConfig.MAX_BLOCK_MS_CONFIG), "1000")); MapConfig applicationRunnerConfig1 = new MapConfig(ImmutableList.of(applicationConfig1, kafkaProducerConfig)); MapConfig applicationRunnerConfig2 = new MapConfig(ImmutableList.of(applicationConfig2, kafkaProducerConfig)); LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationRunnerConfig1); http://git-wip-us.apache.org/repos/asf/samza/blob/a6540b4e/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 4ba51f3..d9261ad 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -117,9 +117,7 @@ object StreamTaskTestUtil { }) servers = configs.map(TestUtils.createServer(_)).toBuffer - - val brokerList = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT) - brokers = brokerList.split(",").map(p => "127.0.0.1" + p).mkString(",") + brokers = TestUtils.getBrokerListStrFromServers(servers, SecurityProtocol.PLAINTEXT) // setup the zookeeper and bootstrap servers for local kafka cluster jobConfig ++= Map("systems.kafka.consumer.zookeeper.connect" -> zkConnect, @@ -161,9 +159,8 @@ object StreamTaskTestUtil { topics.foreach(topic => { val topicMetadata = topicMetadataMap(topic) - val errorCode = topicMetadata.errorCode - KafkaUtil.maybeThrowException(errorCode) + KafkaUtil.maybeThrowException(topicMetadata.error.exception()) }) done = true
