Repository: ignite Updated Branches: refs/heads/master 8305d64ad -> bbd325566
IGNITE-9126 Update Apache Kafka dependency - Fixes #4909. Signed-off-by: Dmitriy Pavlov <dpav...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbd32556 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbd32556 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbd32556 Branch: refs/heads/master Commit: bbd325566cc4066ebd70b58278ab33052fb5e436 Parents: 8305d64 Author: Max-Pudov <pudov....@gmail.com> Authored: Wed Oct 10 17:06:16 2018 +0300 Committer: Dmitriy Pavlov <dpav...@apache.org> Committed: Wed Oct 10 17:06:16 2018 +0300 ---------------------------------------------------------------------- modules/kafka/pom.xml | 14 ++++++++++++++ .../apache/ignite/stream/kafka/TestKafkaBroker.java | 9 ++++++--- .../stream/kafka/connect/IgniteSinkConnectorTest.java | 9 ++++++--- .../kafka/connect/IgniteSourceConnectorTest.java | 10 +++++++--- parent/pom.xml | 2 +- 5 files changed, 34 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml index 18ffcaa..63b2af2 100644 --- a/modules/kafka/pom.xml +++ b/modules/kafka/pom.xml @@ -56,6 +56,13 @@ <dependency> <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> <scope>test</scope> @@ -73,6 +80,13 @@ <artifactId>curator-test</artifactId> <version>${curator.version}</version> <scope>test</scope> + <!-- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/143 --> + <exclusions> + <exclusion> + <artifactId>guava</artifactId> + <groupId>com.google.guava</groupId> + </exclusion> + </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java index 4f0d1d3..9b9b377 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java @@ -27,7 +27,8 @@ import java.util.Properties; import java.util.concurrent.TimeoutException; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.SystemTime$; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.utils.SystemTime; import kafka.utils.TestUtils; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; @@ -101,7 +102,9 @@ public class TestKafkaBroker { servers.add(kafkaSrv); - TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor, + KafkaZkClient client = kafkaSrv.zkClient(); + + TestUtils.createTopic(client, topic, partitions, replicationFactor, scala.collection.JavaConversions.asScalaBuffer(servers), new Properties()); } @@ -154,7 +157,7 @@ public class TestKafkaBroker { private void setupKafkaServer() throws IOException { kafkaCfg = new KafkaConfig(getKafkaConfig()); - kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$); + kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime()); kafkaSrv.startup(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java index 90306a7..d710a75 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; @@ -48,6 +49,7 @@ import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.FutureCallback; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; @@ -95,15 +97,16 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest { for (String topic : TOPICS) kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR); - WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); + Map<String, String> props = makeWorkerProps(); + WorkerConfig workerCfg = new StandaloneConfig(props); OffsetBackingStore offBackingStore = mock(OffsetBackingStore.class); offBackingStore.configure(workerCfg); - worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore); + worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore); worker.start(); - herder = new StandaloneHerder(worker); + herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg)); herder.start(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java index cc487aa..8717044 100644 --- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java +++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java @@ -47,11 +47,13 @@ import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneHerder; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.FutureCallback; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; @@ -98,15 +100,16 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { kafkaBroker = new TestKafkaBroker(); - WorkerConfig workerCfg = new StandaloneConfig(makeWorkerProps()); + Map<String, String> props = makeWorkerProps(); + WorkerConfig workerCfg = new StandaloneConfig(props); MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); offBackingStore.configure(workerCfg); - worker = new Worker(WORKER_ID, new SystemTime(), workerCfg, offBackingStore); + worker = new Worker(WORKER_ID, new SystemTime(), new Plugins(props), workerCfg, offBackingStore); worker.start(); - herder = new StandaloneHerder(worker); + herder = new StandaloneHerder(worker, ConnectUtils.lookupKafkaClusterId(workerCfg)); herder.start(); } @@ -250,6 +253,7 @@ public class IgniteSourceConnectorTest extends GridCommonAbstractTest { props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbd32556/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index cfc3483..8cd08ee 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -89,7 +89,7 @@ <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version> <jsonlib.version>2.4</jsonlib.version> <jtidy.version>r938</jtidy.version> - <kafka.version>0.10.0.1</kafka.version> + <kafka.version>1.1.1</kafka.version> <karaf.version>4.0.2</karaf.version> <log4j.version>2.11.0</log4j.version> <lucene.bundle.version>7.4.0_1</lucene.bundle.version>