DHRUV BANSAL created KAFKA-6339:
-----------------------------------

             Summary: Integration test with embedded kafka.
                 Key: KAFKA-6339
                 URL: https://issues.apache.org/jira/browse/KAFKA-6339
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.11.0.2
            Reporter: DHRUV BANSAL


I am using Kafka version - 0.11.0.2

Trying to write an integration test for one of the components I am writing over 
Kafka.
Following code works fine with Kafka version 0.10.0.0 but not working with 
mentioned version (0.11.0.2)
{{
// setup Zookeeper
                EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper();
                String zkConnect = ZKHOST + ":" + embeddedZookeeper.port();
                ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
                ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

                // setup Broker
                Properties brokerProps = new Properties();
                brokerProps.setProperty("zookeeper.connect", zkConnect);
                brokerProps.setProperty("broker.id", "0");
                brokerProps.setProperty("offsets.topic.replication.factor", 
"1");
                String kafka_log_path = 
Files.createTempDirectory("kafka-").toAbsolutePath().toString();
                System.out.println("kafka log path " + kafka_log_path);
                brokerProps.setProperty("log.dirs", kafka_log_path);
                brokerProps.setProperty("listeners", "PLAINTEXT://" + 
BROKERHOST + ":" + BROKERPORT);
                KafkaConfig config = new KafkaConfig(brokerProps);
                Time mock = new MockTime();
                KafkaServer kafkaServer = TestUtils.createServer(config, mock);

                // create topic
                AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);

                // setup producer
                Properties producerProps = new Properties();
                producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
+ BROKERPORT);
                producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.IntegerSerializer");
                producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
                KafkaProducer<Integer, byte[]> producer = new 
KafkaProducer<Integer, byte[]>(producerProps);

                // setup consumer
                Properties consumerProps = new Properties();
                consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" 
+ BROKERPORT);
                consumerProps.setProperty("group.id", "group0");
                consumerProps.setProperty("client.id", "consumer0");
                consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.IntegerDeserializer");
                consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
                consumerProps.put("auto.offset.reset", "earliest"); // to make 
sure the consumer starts from the beginning of
                                                                                
                                        // the topic

                KafkaConsumer<Integer, byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
                consumer.subscribe(Arrays.asList(TOPIC));

                // send message
                ProducerRecord<Integer, byte[]> data = new 
ProducerRecord<>(TOPIC, 42,
                                
"test-message".getBytes(StandardCharsets.UTF_8));
                Future<RecordMetadata> record = producer.send(data);
                RecordMetadata metadata = record.get();

                // starting consumer
                ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
                assertEquals(1, records.count());
                Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = 
records.iterator();
                ConsumerRecord<Integer, byte[]> consumedRecord = 
recordIterator.next();
                System.out.printf("offset = %d, key = %s, value = %s", 
consumedRecord.offset(), consumedRecord.key(),
                                consumedRecord.value());
                assertEquals(42, (int) consumedRecord.key());
                assertEquals("test-message", new String(consumedRecord.value(), 
StandardCharsets.UTF_8));

                kafkaServer.shutdown();
                zkClient.close();
                embeddedZookeeper.shutdown();
}}

Please provide support for the same and there should be proper documentation 
for the intergration test with each release. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to