Repository: kafka Updated Branches: refs/heads/trunk c42bfc0d5 -> ffd8f18a1
KAFKA-4501; Fix EasyMock and disable PowerMock tests under Java 9 - EasyMock 3.5 supports Java 9. - Fixed issues in `testFailedSendRetryLogic` and `testCreateConnectorAlreadyExists` exposed by new EasyMock version. The former was passing `anyObject` to `andReturn`, which doesn't make sense. This was leaving behind a global `any` matcher, which caused a few issues in the new version. Fixing this meant that the correlation ids had to be updated to actually match. The latter was missing a couple of expectations that the previous version of EasyMock didn't catch. - Removed unnecessary PowerMock dependency from 3 tests. - Disabled remaining PowerMock tests when running with Java 9 until https://github.com/powermock/powermock/issues/783 is in a release. - Once we merge this PR, we can enable tests in the Java 9 builds in Jenkins. Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #3845 from ijuma/kafka-4501-easymock-powermock-java-9 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd8f18a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd8f18a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd8f18a Branch: refs/heads/trunk Commit: ffd8f18a129fa826d62f527ea2c8eba2cfd644b2 Parents: c42bfc0 Author: Ismael Juma <[email protected]> Authored: Wed Sep 13 18:18:54 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Sep 13 18:18:54 2017 +0100 ---------------------------------------------------------------------- build.gradle | 22 +++++++++++++ .../file/FileStreamSinkConnectorTest.java | 25 ++++++--------- .../file/FileStreamSourceConnectorTest.java | 19 +++++------ .../connect/file/FileStreamSourceTaskTest.java | 16 +++++----- .../distributed/DistributedHerderTest.java | 2 ++ .../unit/kafka/producer/AsyncProducerTest.scala | 33 ++++++++++---------- gradle/dependencies.gradle | 2 +- 7 files changed, 69 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 62e9f08..b1950e7 100644 --- a/build.gradle +++ b/build.gradle @@ -193,6 +193,20 @@ subprojects { def testShowStandardStreams = false def testExceptionFormat = 'full' + // Exclude PowerMock tests when running with Java 9 until a version of PowerMock that supports Java 9 is released + // The relevant issue is https://github.com/powermock/powermock/issues/783 + String[] testsToExclude = [] + if (JavaVersion.current().isJava9Compatible()) { + testsToExclude = [ + "**/KafkaProducerTest.*", "**/BufferPoolTest.*", + "**/SourceTaskOffsetCommitterTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*", + "**/WorkerSourceTaskTest.*", "**/WorkerTest.*", "**/DistributedHerderTest.*", "**/WorkerCoordinatorTest.*", + "**/RestServerTest.*", "**/ConnectorPluginsResourceTest.*", "**/ConnectorsResourceTest.*", + "**/StandaloneHerderTest.*", "**/FileOffsetBakingStoreTest.*", "**/KafkaConfigBackingStoreTest.*", + "**/KafkaOffsetBackingStoreTest.*", "**/OffsetStorageWriterTest.*", "**/KafkaBasedLogTest.*" + ] + } + test { maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors() @@ -206,6 +220,7 @@ subprojects { exceptionFormat = testExceptionFormat } + exclude(testsToExclude) } task integrationTest(type: Test, dependsOn: compileJava) { @@ -220,9 +235,13 @@ subprojects { showStandardStreams = userShowStandardStreams ?: testShowStandardStreams exceptionFormat = testExceptionFormat } + useJUnit { includeCategories 'org.apache.kafka.test.IntegrationTest' } + + exclude(testsToExclude) + } task unitTest(type: Test, dependsOn: compileJava) { @@ -237,9 +256,12 @@ subprojects { showStandardStreams = userShowStandardStreams ?: testShowStandardStreams exceptionFormat = testExceptionFormat } + useJUnit { excludeCategories 'org.apache.kafka.test.IntegrationTest' } + + exclude(testsToExclude) } jar { http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java index aead7ef..c06c991 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkConnectorTest.java @@ -16,14 +16,12 @@ */ package org.apache.kafka.connect.file; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.sink.SinkConnector; +import org.easymock.EasyMockSupport; import org.junit.Before; import org.junit.Test; -import org.powermock.api.easymock.PowerMock; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,14 +29,9 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -public class FileStreamSinkConnectorTest { +public class FileStreamSinkConnectorTest extends EasyMockSupport { private static final String MULTIPLE_TOPICS = "test1,test2"; - private static final String[] MULTIPLE_TOPICS_LIST - = MULTIPLE_TOPICS.split(","); - private static final List<TopicPartition> MULTIPLE_TOPICS_PARTITIONS = Arrays.asList( - new TopicPartition("test1", 1), new TopicPartition("test2", 2) - ); private static final String FILENAME = "/afilename"; private FileStreamSinkConnector connector; @@ -48,7 +41,7 @@ public class FileStreamSinkConnectorTest { @Before public void setup() { connector = new FileStreamSinkConnector(); - ctx = PowerMock.createMock(ConnectorContext.class); + ctx = createMock(ConnectorContext.class); connector.initialize(ctx); sinkProperties = new HashMap<>(); @@ -58,7 +51,7 @@ public class FileStreamSinkConnectorTest { @Test public void testSinkTasks() { - PowerMock.replayAll(); + replayAll(); connector.start(sinkProperties); List<Map<String, String>> taskConfigs = connector.taskConfigs(1); @@ -71,12 +64,12 @@ public class FileStreamSinkConnectorTest { assertEquals(FILENAME, taskConfigs.get(0).get(FileStreamSinkConnector.FILE_CONFIG)); } - PowerMock.verifyAll(); + verifyAll(); } @Test public void testSinkTasksStdout() { - PowerMock.replayAll(); + replayAll(); sinkProperties.remove(FileStreamSourceConnector.FILE_CONFIG); connector.start(sinkProperties); @@ -84,16 +77,16 @@ public class FileStreamSinkConnectorTest { assertEquals(1, taskConfigs.size()); assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); - PowerMock.verifyAll(); + verifyAll(); } @Test public void testTaskClass() { - PowerMock.replayAll(); + replayAll(); connector.start(sinkProperties); assertEquals(FileStreamSinkTask.class, connector.taskClass()); - PowerMock.verifyAll(); + verifyAll(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java index f1fb4ef..69a94a8 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceConnectorTest.java @@ -18,9 +18,10 @@ package org.apache.kafka.connect.file; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.errors.ConnectException; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.junit.Before; import org.junit.Test; -import org.powermock.api.easymock.PowerMock; import java.util.HashMap; import java.util.List; @@ -29,7 +30,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -public class FileStreamSourceConnectorTest { +public class FileStreamSourceConnectorTest extends EasyMockSupport { private static final String SINGLE_TOPIC = "test"; private static final String MULTIPLE_TOPICS = "test1,test2"; @@ -42,7 +43,7 @@ public class FileStreamSourceConnectorTest { @Before public void setup() { connector = new FileStreamSourceConnector(); - ctx = PowerMock.createMock(ConnectorContext.class); + ctx = createMock(ConnectorContext.class); connector.initialize(ctx); sourceProperties = new HashMap<>(); @@ -52,7 +53,7 @@ public class FileStreamSourceConnectorTest { @Test public void testSourceTasks() { - PowerMock.replayAll(); + replayAll(); connector.start(sourceProperties); List<Map<String, String>> taskConfigs = connector.taskConfigs(1); @@ -70,12 +71,12 @@ public class FileStreamSourceConnectorTest { assertEquals(SINGLE_TOPIC, taskConfigs.get(0).get(FileStreamSourceConnector.TOPIC_CONFIG)); - PowerMock.verifyAll(); + verifyAll(); } @Test public void testSourceTasksStdin() { - PowerMock.replayAll(); + EasyMock.replay(ctx); sourceProperties.remove(FileStreamSourceConnector.FILE_CONFIG); connector.start(sourceProperties); @@ -83,7 +84,7 @@ public class FileStreamSourceConnectorTest { assertEquals(1, taskConfigs.size()); assertNull(taskConfigs.get(0).get(FileStreamSourceConnector.FILE_CONFIG)); - PowerMock.verifyAll(); + EasyMock.verify(ctx); } @Test(expected = ConnectException.class) @@ -94,11 +95,11 @@ public class FileStreamSourceConnectorTest { @Test public void testTaskClass() { - PowerMock.replayAll(); + EasyMock.replay(ctx); connector.start(sourceProperties); assertEquals(FileStreamSourceTask.class, connector.taskClass()); - PowerMock.verifyAll(); + EasyMock.verify(ctx); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java index 03fb774..cde6c43 100644 --- a/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java +++ b/connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSourceTaskTest.java @@ -21,10 +21,10 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.powermock.api.easymock.PowerMock; import java.io.ByteArrayInputStream; import java.io.File; @@ -37,7 +37,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; -public class FileStreamSourceTaskTest { +public class FileStreamSourceTaskTest extends EasyMockSupport { private static final String TOPIC = "test"; @@ -56,8 +56,8 @@ public class FileStreamSourceTaskTest { config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsolutePath()); config.put(FileStreamSourceConnector.TOPIC_CONFIG, TOPIC); task = new FileStreamSourceTask(); - offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); - context = PowerMock.createMock(SourceTaskContext.class); + offsetStorageReader = createMock(OffsetStorageReader.class); + context = createMock(SourceTaskContext.class); task.initialize(context); } @@ -66,11 +66,11 @@ public class FileStreamSourceTaskTest { tempFile.delete(); if (verifyMocks) - PowerMock.verifyAll(); + verifyAll(); } private void replay() { - PowerMock.replayAll(); + replayAll(); verifyMocks = true; } @@ -164,6 +164,6 @@ public class FileStreamSourceTaskTest { private void expectOffsetLookupReturnNone() { EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); - EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null); + EasyMock.expect(offsetStorageReader.offset(EasyMock.<Map<String, String>>anyObject())).andReturn(null); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 7834a89..dcbd88f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -481,6 +481,8 @@ public class DistributedHerderTest { @Test public void testCreateConnectorAlreadyExists() throws Exception { EasyMock.expect(member.memberId()).andStubReturn("leader"); + EasyMock.expect(worker.getPlugins()).andReturn(plugins); + EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null); expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); expectPostRebalanceCatchup(SNAPSHOT); http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6e7353c..376c71f 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -372,9 +372,9 @@ class AsyncProducerTest { val props = new Properties() props.put("metadata.broker.list", brokerList) props.put("request.required.acks", "1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) - props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) - props.put("producer.num.retries", 3.toString) + props.put("serializer.class", classOf[StringEncoder].getName) + props.put("key.serializer.class", classOf[NullEncoder[Int]].getName) + props.put("producer.num.retries", "3") val config = new ProducerConfig(props) @@ -391,26 +391,27 @@ class AsyncProducerTest { // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, - correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) + correlationId = 5, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, - correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) + correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21, + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 15, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) // don't care about config mock - EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes() + val mockConfig = EasyMock.createNiceMock(classOf[SyncProducerConfig]) + EasyMock.expect(mockSyncProducer.config).andReturn(mockConfig).anyTimes() EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1) EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2) EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) - EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(4) + EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer).times(3) EasyMock.expect(producerPool.close()) EasyMock.replay(producerPool) val time = new Time { @@ -419,14 +420,14 @@ class AsyncProducerTest { override def sleep(ms: Long): Unit = {} override def hiResClockMs: Long = 0L } - val handler = new DefaultEventHandler[Int,String](config, - partitioner = new FixedValuePartitioner(), - encoder = new StringEncoder(), - keyEncoder = new NullEncoder[Int](), - producerPool = producerPool, - topicPartitionInfos = topicPartitionInfos, - time = time) - val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m => new KeyedMessage[Int,String](topic1, 1, m)) + val handler = new DefaultEventHandler(config, + partitioner = new FixedValuePartitioner(), + encoder = new StringEncoder(), + keyEncoder = new NullEncoder[Int](), + producerPool = producerPool, + topicPartitionInfos = topicPartitionInfos, + time = time) + val data = msgs.map(m => new KeyedMessage(topic1, 0, m)) ++ msgs.map(m => new KeyedMessage(topic1, 1, m)) handler.handle(data) handler.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd8f18a/gradle/dependencies.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 65f4575..3b95100 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -51,7 +51,7 @@ versions += [ apacheds: "2.0.0-M24", argparse4j: "0.7.0", bcpkix: "1.58", - easymock: "3.4", + easymock: "3.5", jackson: "2.9.1", jetty: "9.2.22.v20170606", jersey: "2.25.1",
