[
https://issues.apache.org/jira/browse/KAFKA-6339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287218#comment-16287218
]
DHRUV BANSAL commented on KAFKA-6339:
-------------------------------------
Hello Narendra, Thanks for response.
The problem is test keep running foreever and the same logs keeps on repeating.
> Integration test with embedded kafka not working
> ------------------------------------------------
>
> Key: KAFKA-6339
> URL: https://issues.apache.org/jira/browse/KAFKA-6339
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.11.0.2
> Reporter: DHRUV BANSAL
>
> I am using Kafka version - 0.11.0.2
> Trying to write an integration test for one of the components I am writing
> over Kafka.
> Following code works fine with Kafka version 0.10.0.0 but not working with
> mentioned version (0.11.0.2)
> // setup Zookeeper
> {code:java}
> String ZKHOST = "127.0.0.1";
> String BROKERHOST = "127.0.0.1";
> String BROKERPORT = "9093";
> String TOPIC = "test1";
> EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
> String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
> ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000,
> ZKStringSerializer$.MODULE$);
> ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
> // setup Broker
> Properties brokerProps = new Properties();
> brokerProps.setProperty("zookeeper.connect", zkConnect);
> brokerProps.setProperty("broker.id", "0");
> brokerProps.setProperty("offsets.topic.replication.factor",
> "1");
> String kafka_log_path =
> Files.createTempDirectory("kafka-").toAbsolutePath().toString();
> System.out.println("kafka log path " + kafka_log_path);
> brokerProps.setProperty("log.dirs", kafka_log_path);
> brokerProps.setProperty("listeners", "PLAINTEXT://" +
> BROKERHOST + ":" + BROKERPORT);
> KafkaConfig config = new KafkaConfig(brokerProps);
> Time mock = new MockTime();
> KafkaServer kafkaServer = TestUtils.createServer(config, mock);
> // create topic
> AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(),
> RackAwareMode.Disabled$.MODULE$);
> // setup producer
> Properties producerProps = new Properties();
> producerProps.setProperty("bootstrap.servers", BROKERHOST + ":"
> + BROKERPORT);
> producerProps.setProperty("key.serializer",
> "org.apache.kafka.common.serialization.IntegerSerializer");
> producerProps.setProperty("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> KafkaProducer<Integer, byte[]> producer = new
> KafkaProducer<Integer, byte[]>(producerProps);
> // setup consumer
> Properties consumerProps = new Properties();
> consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":"
> + BROKERPORT);
> consumerProps.setProperty("group.id", "group0");
> consumerProps.setProperty("client.id", "consumer0");
> consumerProps.setProperty("key.deserializer",
> "org.apache.kafka.common.serialization.IntegerDeserializer");
> consumerProps.setProperty("value.deserializer",
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> consumerProps.put("auto.offset.reset", "earliest"); // to make
> sure the consumer starts from the beginning of
>
> // the topic
> KafkaConsumer<Integer, byte[]> consumer = new
> KafkaConsumer<>(consumerProps);
> consumer.subscribe(Arrays.asList(TOPIC));
> // send message
> ProducerRecord<Integer, byte[]> data = new
> ProducerRecord<>(TOPIC, 42,
>
> "test-message".getBytes(StandardCharsets.UTF_8));
> Future<RecordMetadata> record = producer.send(data);
> RecordMetadata metadata = record.get();
> // starting consumer
> ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
> assertEquals(1, records.count());
> Iterator<ConsumerRecord<Integer, byte[]>> recordIterator =
> records.iterator();
> ConsumerRecord<Integer, byte[]> consumedRecord =
> recordIterator.next();
> System.out.printf("offset = %d, key = %s, value = %s",
> consumedRecord.offset(), consumedRecord.key(),
> consumedRecord.value());
> assertEquals(42, (int) consumedRecord.key());
> assertEquals("test-message", new String(consumedRecord.value(),
> StandardCharsets.UTF_8));
> kafkaServer.shutdown();
> zkClient.close();
> embeddedZookeeper.shutdown();
> {code}
> On running given test with release 0.11.0.2, I am getting following error:
> 00:09:06.112 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0x5 zxid:0x4 txntype:-1 reqpath:n/a Error Path:/brokers
> Error:KeeperErrorCode = NoNode for /brokers
> 00:09:06.119 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0xb zxid:0x8 txntype:-1 reqpath:n/a Error Path:/config
> Error:KeeperErrorCode = NoNode for /config
> 00:09:06.126 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0x13 zxid:0xd txntype:-1 reqpath:n/a Error Path:/admin
> Error:KeeperErrorCode = NoNode for /admin
> 00:09:06.183 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0x1d zxid:0x13 txntype:-1 reqpath:n/a Error Path:/cluster
> Error:KeeperErrorCode = NoNode for /cluster
> 00:09:06.539 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:setData
> cxid:0x27 zxid:0x17 txntype:-1 reqpath:n/a Error Path:/controller_epoch
> Error:KeeperErrorCode = NoNode for /controller_epoch
> 00:09:06.684 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:delete
> cxid:0x3c zxid:0x1a txntype:-1 reqpath:n/a Error
> Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for
> /admin/preferred_replica_election
> 00:09:06.770 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0x46 zxid:0x1b txntype:-1 reqpath:n/a Error Path:/brokers
> Error:KeeperErrorCode = NodeExists for /brokers
> 00:09:06.770 [ProcessThread(sid:0 cport:50192):] INFO
> org.apache.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x1603c92dae80001 type:create
> cxid:0x47 zxid:0x1c txntype:-1 reqpath:n/a Error Path:/brokers/ids
> Error:KeeperErrorCode = NodeExists for /brokers/ids
> Please provide support for the same and there should be proper documentation
> for the integration test with each release.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)