Repository: crunch Updated Branches: refs/heads/master c09c4ee2d -> 360d72a4f
http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java new file mode 100644 index 0000000..ce97ec1 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + + +import kafka.api.OffsetRequest; +import org.apache.crunch.Pair; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.crunch.kafka.ClusterTest.writeData; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaRecordsIterableIT { + + @Mock + private Consumer<String, String> mockedConsumer; + + @Mock + private ConsumerRecords<String, String> records; + + @Rule + public TestName testName = new TestName(); + + private String topic; + private Map<TopicPartition, Long> startOffsets; + private Map<TopicPartition, Long> stopOffsets; + private Map<TopicPartition, Pair<Long, Long>> offsets; + private Consumer<String, String> consumer; + private Properties props; + private Properties consumerProps; + + @BeforeClass + public static void init() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setup() { + topic = testName.getMethodName(); + + props = ClusterTest.getConsumerProperties(); + + startOffsets = new HashMap<>(); + stopOffsets = new HashMap<>(); + offsets = new HashMap<>(); + for (int i = 0; i < 4; i++) { + TopicPartition tp = new TopicPartition(topic, i); + startOffsets.put(tp, 0L); + stopOffsets.put(tp, 100L); + + offsets.put(tp, Pair.of(0L, 100L)); + } + + + consumerProps = new Properties(); + consumerProps.putAll(props); + } + + @After + public void shutdown() { + } + + + @Test(expected = IllegalArgumentException.class) + public void nullConsumer() { + new KafkaRecordsIterable(null, offsets, new Properties()); + } + + @Test(expected = IllegalArgumentException.class) + public void nullOffsets() { + new KafkaRecordsIterable<>(consumer, null, new Properties()); + } + + @Test(expected=IllegalArgumentException.class) + public void emptyOffsets() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, + Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties()); + } + + @Test(expected = IllegalArgumentException.class) + public void nullProperties() { + new KafkaRecordsIterable(consumer, offsets, null); + } + + @Test + public void iterateOverValues() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 10; + int numPerLoop = 100; + int total = loops * numPerLoop; + List<String> keys = writeData(props, topic, "batch", loops, numPerLoop); + + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStopOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + assertThat(keys, hasItem(event.first())); + assertTrue(keys.remove(event.first())); + count++; + } + + assertThat(count, is(total)); + assertThat(keys.size(), is(0)); + } + + @Test + public void iterateOverOneValue() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 1; + int numPerLoop = 1; + int total = loops * numPerLoop; + List<String> keys = writeData(props, topic, "batch", loops, numPerLoop); + + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStopOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey())); + } + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + assertThat(keys, hasItem(event.first())); + assertTrue(keys.remove(event.first())); + count++; + } + + assertThat(count, is(total)); + assertThat(keys.size(), is(0)); + } + + @Test + public void iterateOverNothing() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 10; + int numPerLoop = 100; + writeData(props, topic, "batch", loops, numPerLoop); + + //set the start offsets equal to the stop so won't iterate over anything + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStartOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + count++; + } + + assertThat(count, is(0)); + } + + @Test + public void iterateOverPartial() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 10; + int numPerLoop = 100; + int numPerPartition = 50; + + writeData(props, topic, "batch", loops, numPerLoop); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + //set the start offsets equal to the stop so won't iterate over anything + startOffsets = getStartOffsets(props, topic); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition)); + } + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + count++; + } + + assertThat(count, is(startOffsets.size() * numPerPartition)); + } + + @Test + public void dontIteratePastStop() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 10; + int numPerLoop = 100; + + List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop); + + //set the start offsets equal to the stop so won't iterate over anything + startOffsets = getStartOffsets(props, topic); + stopOffsets = getStopOffsets(props, topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop); + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + assertThat(keys, hasItem(event.first())); + assertTrue(keys.remove(event.first())); + assertThat(secondKeys, not(hasItem(event.first()))); + count++; + } + + assertThat(count, is(loops * numPerLoop)); + assertThat(keys.size(), is(0)); + } + + @Test + public void iterateSkipInitialValues() { + consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe()); + int loops = 10; + int numPerLoop = 100; + + List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop); + + //set the start offsets equal to the stop so won't iterate over anything + startOffsets = getStopOffsets(props, topic); + + List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop); + + stopOffsets = getStopOffsets(props, topic); + + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey()))); + } + + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, + new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + assertThat(secondKeys, hasItem(event.first())); + assertTrue(secondKeys.remove(event.first())); + assertThat(keys, not(hasItem(event.first()))); + count++; + } + + assertThat(count, is(loops * numPerLoop)); + assertThat(secondKeys.size(), is(0)); + } + + @Test + public void iterateValuesWithExceptions() { + List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>(); + + for(int i = 0; i < 25; i++){ + returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null)); + returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null)); + returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null)); + returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null)); + } + + offsets = new HashMap<>(); + offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L)); + offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L)); + offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L)); + offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L)); + + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(returnedRecords.iterator()); + when(mockedConsumer.poll(Matchers.anyLong())) + //request for the first poll + .thenReturn(null) + //fail twice + .thenThrow(new TimeoutException("fail1")) + .thenThrow(new TimeoutException("fail2")) + //request that will give data + .thenReturn(records) + // shows to stop retrieving data + .thenReturn(null); + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + count++; + } + + //should have gotten one value per topicpartition + assertThat(count, is(returnedRecords.size())); + } + + @Test + public void iterateValuesAfterStopOffsets() { + List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>(); + for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) { + returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(), + entry.getKey().partition(), entry.getValue() + 1, "key", null)); + } + + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(returnedRecords.iterator()); + when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null); + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties()); + + int count = 0; + for (Pair<String, String> event : data) { + count++; + } + + assertThat(count, is(0)); + + } + + @Test(expected = RetriableException.class) + public void iterateRetriableExceptionMaxExceeded() { + List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>(); + for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) { + returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(), + entry.getKey().partition(), entry.getValue() + 1, "key", null)); + } + + when(records.isEmpty()).thenReturn(false); + when(records.iterator()).thenReturn(returnedRecords.iterator()); + when(mockedConsumer.poll(Matchers.anyLong())) + //for the fill poll call + .thenReturn(null) + //retry 5 times then fail + .thenThrow(new TimeoutException("fail1")) + .thenThrow(new TimeoutException("fail2")) + .thenThrow(new TimeoutException("fail3")) + .thenThrow(new TimeoutException("fail4")) + .thenThrow(new TimeoutException("fail5")) + .thenThrow(new TimeoutException("fail6")); + + Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties()); + + data.iterator().next(); + } + + private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic); + } + + private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) { + return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java new file mode 100644 index 0000000..3800c24 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import kafka.api.OffsetRequest; +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.Pipeline; +import org.apache.crunch.TableSource; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.matchers.JUnitMatchers.hasItem; + +public class KafkaSourceIT { + + @Rule + public TemporaryPath path = new TemporaryPath(); + + @Rule + public TestName testName = new TestName(); + + private Properties consumerProps; + private Configuration config; + private String topic; + + @BeforeClass + public static void setup() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setupTest() { + topic = testName.getMethodName(); + consumerProps = ClusterTest.getConsumerProperties(); + config = ClusterTest.getConsumerConfig(); + } + + @Test + public void sourceReadData() { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + Configuration config = ClusterTest.getConf(); + + Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config); + pipeline.enableDebug(); + + TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets); + + PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource); + + Set<String> keysRead = new HashSet<>(); + int numRecordsFound = 0; + for (Pair<BytesWritable, BytesWritable> values : read.materialize()) { + assertThat(keys, hasItem(new String(values.first().getBytes()))); + numRecordsFound++; + keysRead.add(new String(values.first().getBytes())); + } + + assertThat(numRecordsFound, is(keys.size())); + assertThat(keysRead.size(), is(keys.size())); + + pipeline.done(); + } + + + @Test + public void sourceReadDataThroughPipeline() { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + Configuration config = ClusterTest.getConf(); + + Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config); + pipeline.enableDebug(); + + TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets); + + PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource); + Path out = path.getPath("out"); + read.parallelDo(new SimpleConvertFn(), Avros.strings()).write(To.textFile(out)); + + pipeline.run(); + + PCollection<String> persistedKeys = pipeline.read(From.textFile(out)); + + Set<String> keysRead = new HashSet<>(); + int numRecordsFound = 0; + for (String value : persistedKeys.materialize()) { + assertThat(keys, hasItem(value)); + numRecordsFound++; + keysRead.add(value); + } + + assertThat(numRecordsFound, is(keys.size())); + assertThat(keysRead.size(), is(keys.size())); + + pipeline.done(); + } + + + private static class SimpleConvertFn extends MapFn<Pair<BytesWritable, BytesWritable>, String> { + @Override + public String map(Pair<BytesWritable, BytesWritable> input) { + return new String(input.first().getBytes()); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java new file mode 100644 index 0000000..38c3fce --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka; + +import kafka.cluster.Broker; +import org.apache.crunch.kafka.ClusterTest; +import org.apache.crunch.kafka.KafkaUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; + +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + + +public class KafkaUtilsIT { + + @Rule + public TestName testName = new TestName(); + + private String topic; + private static Broker broker; + + @BeforeClass + public static void startup() throws Exception { + ClusterTest.startTest(); + + Properties props = ClusterTest.getConsumerProperties(); + String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + + String brokerHostPortString = brokerHostPorts.split(",")[0]; + String[] brokerHostPort = brokerHostPortString.split(":"); + + String brokerHost = brokerHostPort[0]; + int brokerPort = Integer.parseInt(brokerHostPort[1]); + + broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT); + } + + @AfterClass + public static void shutdown() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setup() throws IOException { + topic = "topic-" + testName.getMethodName(); + } + + @Test + public void getKafkaProperties() { + Configuration config = new Configuration(false); + String propertyKey = "fake.kafka.property"; + String propertyValue = testName.getMethodName(); + config.set(propertyKey, propertyValue); + + Properties props = KafkaUtils.getKafkaConnectionProperties(config); + assertThat(props.get(propertyKey), is((Object) propertyValue)); + } + + @Test + public void addKafkaProperties() { + String propertyKey = "fake.kafka.property"; + String propertyValue = testName.getMethodName(); + + Properties props = new Properties(); + props.setProperty(propertyKey, propertyValue); + + Configuration config = new Configuration(false); + + KafkaUtils.addKafkaConnectionProperties(props, config); + assertThat(config.get(propertyKey), is(propertyValue)); + } + + + @Test(expected = IllegalArgumentException.class) + public void getBrokerOffsetsKafkaNullProperties() throws IOException { + KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic); + } + + @Test(expected = IllegalArgumentException.class) + public void getBrokerOffsetsKafkaNullTopics() throws IOException { + KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null); + } + + @Test(expected = IllegalArgumentException.class) + public void getBrokerOffsetsKafkaEmptyTopics() throws IOException { + KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime()); + } + + @Test(timeout = 10000) + public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException { + ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4); + while (true) { + Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), + kafka.api.OffsetRequest.LatestTime(), topic); + + assertNotNull(offsets); + assertThat(offsets.size(), is(4)); + boolean allMatch = true; + for (int i = 0; i < 4; i++) { + TopicPartition tp = new TopicPartition(topic, i); + assertThat(offsets.keySet(), hasItem(tp)); + allMatch &= (offsets.get(tp) == 1L); + } + if (allMatch) { + break; + } + Thread.sleep(100L); + } + } + + @Test + public void getEarliestBrokerOffsetsKafka() throws IOException { + ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1); + + Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), + kafka.api.OffsetRequest.EarliestTime(), topic); + + assertNotNull(offsets); + //default create 4 topics + assertThat(offsets.size(), is(4)); + for (int i = 0; i < 4; i++) { + assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i))); + assertThat(offsets.get(new TopicPartition(topic, i)), is(0L)); + } + } + + @Test + public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException { + ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4); + + // A time of 1L (1 ms after epoch) should be before the topic was created + Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic); + + assertNotNull(offsets); + //default create 4 topics + assertThat(offsets.size(), is(4)); + for (int i = 0; i < 4; i++) { + assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i))); + assertThat(offsets.get(new TopicPartition(topic, i)), is(0L)); + } + } + + @Test(expected = IllegalStateException.class) + public void getBrokerOffsetsNoHostAvailable() throws IOException { + Properties testProperties = ClusterTest.getConsumerProperties(); + testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000"); + testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000"); + assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic)); + } + + @Test + public void getBrokerOffsetsSomeHostsUnavailable() throws IOException { + final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT); + assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic)); + assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic)); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java new file mode 100644 index 0000000..d760a02 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + + +import kafka.api.OffsetRequest; +import org.apache.crunch.Pair; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.kafka.ClusterTest; +import org.apache.crunch.kafka.KafkaSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.matchers.JUnitMatchers.hasItem; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaInputFormatIT { + + @Rule + public TestName testName = new TestName(); + + @Mock + private TaskAttemptContext taskContext; + + @Mock + private FormatBundle bundle; + private Properties consumerProps; + private Configuration config; + private String topic; + + @BeforeClass + public static void setup() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + @Before + public void setupTest() { + topic = testName.getMethodName(); + consumerProps = ClusterTest.getConsumerProperties(); + + consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + + config = ClusterTest.getConsumerConfig(); + + config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + } + + @Test + public void getSplitsFromFormat() throws IOException, InterruptedException { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + KafkaInputFormat inputFormat = new KafkaInputFormat(); + inputFormat.setConf(config); + List<InputSplit> splits = inputFormat.getSplits(null); + + assertThat(splits.size(), is(offsets.size())); + + for (InputSplit split : splits) { + KafkaInputSplit inputSplit = (KafkaInputSplit) split; + Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition()); + assertThat(inputSplit.getStartingOffset(), is(startEnd.first())); + assertThat(inputSplit.getEndingOffset(), is(startEnd.second())); + } + } + + @Test + public void getSplitsSameStartEnd() throws IOException, InterruptedException { + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for(int i = 0; i < 10; i++) { + offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i)); + } + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + KafkaInputFormat inputFormat = new KafkaInputFormat(); + inputFormat.setConf(config); + List<InputSplit> splits = inputFormat.getSplits(null); + + assertThat(splits.size(), is(0)); + } + + @Test + public void getSplitsCreateReaders() throws IOException, InterruptedException { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + KafkaInputFormat inputFormat = new KafkaInputFormat(); + inputFormat.setConf(config); + List<InputSplit> splits = inputFormat.getSplits(null); + + assertThat(splits.size(), is(offsets.size())); + + for (InputSplit split : splits) { + KafkaInputSplit inputSplit = (KafkaInputSplit) split; + Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition()); + assertThat(inputSplit.getStartingOffset(), is(startEnd.first())); + assertThat(inputSplit.getEndingOffset(), is(startEnd.second())); + } + + //create readers and consume the data + when(taskContext.getConfiguration()).thenReturn(config); + Set<String> keysRead = new HashSet<>(); + //read all data from all splits + for (InputSplit split : splits) { + KafkaInputSplit inputSplit = (KafkaInputSplit) split; + long start = inputSplit.getStartingOffset(); + long end = inputSplit.getEndingOffset(); + + RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext); + recordReader.initialize(split, taskContext); + + int numRecordsFound = 0; + while (recordReader.nextKeyValue()) { + keysRead.add(new String(recordReader.getCurrentKey().getBytes())); + assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes()))); + assertThat(recordReader.getCurrentValue(), is(notNullValue())); + numRecordsFound++; + } + recordReader.close(); + + //assert that it encountered a partitions worth of data + assertThat(((long) numRecordsFound), is(end - start)); + } + + //validate the same number of unique keys was read as were written. + assertThat(keysRead.size(), is(keys.size())); + } + + @Test + public void writeOffsetsToFormatBundle() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + String topic = testName.getMethodName(); + int numPartitions = 10; + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + + KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); + + ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); + + //number of Partitions * 2 for start and end + 1 for the topic + verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture()); + + List<String> keyValues = keyCaptor.getAllValues(); + List<String> valueValues = valueCaptor.getAllValues(); + + String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); + assertThat(keyValues, hasItem(partitionKey)); + + String partitions = valueValues.get(keyValues.indexOf(partitionKey)); + List<String> parts = Arrays.asList(partitions.split(",")); + + for (int i = 0; i < numPartitions; i++) { + assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); + String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); + String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); + assertThat(keyValues, hasItem(startKey)); + assertThat(keyValues, hasItem(endKey)); + assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); + assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); + assertThat(parts, hasItem(Long.toString(i))); + } + } + + @Test + public void writeOffsetsToFormatBundleSpecialCharacters() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + String topic = "partitions." + testName.getMethodName(); + int numPartitions = 10; + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + + KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); + + ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); + + //number of Partitions * 2 for start and end + 1 for the topic + verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture()); + + List<String> keyValues = keyCaptor.getAllValues(); + List<String> valueValues = valueCaptor.getAllValues(); + + String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); + assertThat(keyValues, hasItem(partitionKey)); + + String partitions = valueValues.get(keyValues.indexOf(partitionKey)); + List<String> parts = Arrays.asList(partitions.split(",")); + + for (int i = 0; i < numPartitions; i++) { + assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); + String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); + String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); + assertThat(keyValues, hasItem(startKey)); + assertThat(keyValues, hasItem(endKey)); + assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); + assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); + assertThat(parts, hasItem(Long.toString(i))); + } + } + + @Test + public void writeOffsetsToFormatBundleMultipleTopics() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + Set<String> topics = new HashSet<>(); + + int numPartitions = 10; + int numTopics = 10; + for (int j = 0; j < numTopics; j++) { + String topic = testName.getMethodName() + j; + topics.add(topic); + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + } + + KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); + + ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class); + + //number of Partitions * 2 for start and end + num of topics + verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture()); + + List<String> keyValues = keyCaptor.getAllValues(); + List<String> valueValues = valueCaptor.getAllValues(); + + for (String topic : topics) { + + String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic); + assertThat(keyValues, hasItem(partitionKey)); + + String partitions = valueValues.get(keyValues.indexOf(partitionKey)); + List<String> parts = Arrays.asList(partitions.split(",")); + + for (int i = 0; i < numPartitions; i++) { + assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic))); + String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i); + String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i); + assertThat(keyValues, hasItem(startKey)); + assertThat(keyValues, hasItem(endKey)); + assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i))); + assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L))); + assertThat(parts, hasItem(Long.toString(i))); + } + } + } + + @Test + public void getOffsetsFromConfig() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + Set<String> topics = new HashSet<>(); + + int numPartitions = 10; + int numTopics = 10; + for (int j = 0; j < numTopics; j++) { + String topic = testName.getMethodName() + ".partitions" + j; + topics.add(topic); + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + } + + Configuration config = new Configuration(false); + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); + + assertThat(returnedOffsets.size(), is(returnedOffsets.size())); + for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) { + Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey()); + assertThat(valuePair, is(entry.getValue())); + } + } + + @Test(expected=IllegalStateException.class) + public void getOffsetsFromConfigMissingStart() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + Set<String> topics = new HashSet<>(); + + int numPartitions = 10; + int numTopics = 10; + for (int j = 0; j < numTopics; j++) { + String topic = testName.getMethodName() + ".partitions" + j; + topics.add(topic); + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + } + + Configuration config = new Configuration(false); + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start"); + + Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); + } + + @Test(expected=IllegalStateException.class) + public void getOffsetsFromConfigMissingEnd() { + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + Set<String> topics = new HashSet<>(); + + int numPartitions = 10; + int numTopics = 10; + for (int j = 0; j < numTopics; j++) { + String topic = testName.getMethodName() + ".partitions" + j; + topics.add(topic); + for (int i = 0; i < numPartitions; i++) { + TopicPartition tAndP = new TopicPartition(topic, i); + offsets.put(tAndP, Pair.of((long) i, i * 10L)); + } + } + + Configuration config = new Configuration(false); + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end"); + + Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java new file mode 100644 index 0000000..3833e9d --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + +import kafka.api.OffsetRequest; +import org.apache.kafka.common.TopicPartition; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class KafkaInputSplitTest { + + @Rule + public TestName testName = new TestName(); + + @Test + public void createSplit() throws IOException, InterruptedException { + String topic = testName.getMethodName(); + int partition = 18; + long startingOffet = 10; + long endingOffset = 23; + + + KafkaInputSplit split = new KafkaInputSplit(topic, partition, startingOffet, endingOffset); + assertThat(split.getStartingOffset(), is(startingOffet)); + assertThat(split.getEndingOffset(), is(endingOffset)); + assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition))); + assertThat(split.getLength(), is(endingOffset - startingOffet)); + assertThat(split.getLocations(), is(new String[0])); + } + + @Test + public void createSplitEarliestOffset() throws IOException, InterruptedException { + String topic = testName.getMethodName(); + int partition = 18; + long endingOffset = 23; + + KafkaInputSplit split = new KafkaInputSplit(topic, partition, -1L, endingOffset); + assertThat(split.getStartingOffset(), is(-1L)); + assertThat(split.getEndingOffset(), is(endingOffset)); + assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition))); + assertThat(split.getLength(), is(endingOffset)); + assertThat(split.getLocations(), is(new String[0])); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java new file mode 100644 index 0000000..ba5b65b --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.inputformat; + +import kafka.api.OffsetRequest; +import org.apache.crunch.Pair; +import org.apache.crunch.kafka.ClusterTest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.kafka.common.TopicPartition; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.matchers.JUnitMatchers.hasItem; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaRecordReaderIT { + + @Mock + private TaskAttemptContext context; + + @Rule + public TestName testName = new TestName(); + private Properties consumerProps; + private Configuration config; + + @BeforeClass + public static void setup() throws Exception { + ClusterTest.startTest(); + } + + @AfterClass + public static void cleanup() throws Exception { + ClusterTest.endTest(); + } + + private String topic; + + @Before + public void setupTest() { + topic = testName.getMethodName(); + consumerProps = ClusterTest.getConsumerProperties(); + config = ClusterTest.getConsumerConfig(); + when(context.getConfiguration()).thenReturn(config); + } + + @Test + public void readData() throws IOException, InterruptedException { + List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10); + + Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic); + Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic); + + Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); + for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) { + Long endingOffset = endOffsets.get(entry.getKey()); + offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset)); + } + + KafkaInputFormat.writeOffsetsToConfiguration(offsets, config); + + Set<String> keysRead = new HashSet<>(); + //read all data from all splits + for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) { + KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(), + partitionInfo.getValue().first(), partitionInfo.getValue().second()); + + KafkaRecordReader<String, String> recordReader = new KafkaRecordReader<>(); + recordReader.initialize(split, context); + + int numRecordsFound = 0; + while (recordReader.nextKeyValue()) { + keysRead.add(recordReader.getCurrentKey()); + assertThat(keys, hasItem(recordReader.getCurrentKey())); + assertThat(recordReader.getCurrentValue(), is(notNullValue())); + numRecordsFound++; + } + recordReader.close(); + + //assert that it encountered a partitions worth of data + assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first())); + } + + //validate the same number of unique keys was read as were written. + assertThat(keysRead.size(), is(keys.size())); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java new file mode 100644 index 0000000..ede3cf0 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * <p> + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.utils; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * Embedded Zookeeper instance for testing purposes. + * <p> + * Adapted from the {@code kafka.zk.EmbeddedZookeeper} class. + * </p> + */ +class EmbeddedZookeeper { + + private final File snapshotDir; + private final File logDir; + private final NIOServerCnxnFactory factory; + + /** + * Constructs an embedded Zookeeper instance. + * + * @param connectString Zookeeper connection string. + * + * @throws IOException if an error occurs during Zookeeper initialization. + */ + public EmbeddedZookeeper(String connectString) throws IOException { + this.snapshotDir = KafkaTestUtils.getTempDir(); + this.logDir = KafkaTestUtils.getTempDir(); + this.factory = new NIOServerCnxnFactory(); + String hostname = connectString.split(":")[0]; + int port = Integer.valueOf(connectString.split(":")[1]); + int maxClientConnections = 1024; + factory.configure(new InetSocketAddress(hostname, port), maxClientConnections); + try { + int tickTime = 500; + factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime)); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Shuts down the embedded Zookeeper instance. + */ + public void shutdown() throws IOException { + factory.shutdown(); + FileUtils.deleteDirectory(snapshotDir); + FileUtils.deleteDirectory(logDir); + } +} + http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java new file mode 100644 index 0000000..f47f168 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.kafka.utils; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.Time; +import org.apache.commons.io.FileUtils; +import scala.Option; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static scala.collection.JavaConversions.asJavaIterable; + +/** + * A test harness that brings up some number of Kafka broker nodes. + * <p> + * Adapted from the {@code kafka.integration.KafkaServerTestHarness} class. + * </p> + */ +public class KafkaBrokerTestHarness extends ZookeeperTestHarness { + + /** + * Producer send acknowledgment timeout in milliseconds. + */ + public static final String KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS = "request.timeout.ms"; + + /** + * Producer send retry maximum count. + */ + public static final String KAFKA_PRODUCER_RETRY_MAX = "message.send.max.retries"; + + /** + * Producer send retry backoff interval in milliseconds. + */ + public static final String KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS = "retry.backoff.ms"; + + /** + * Comma-delimited Kafka Zookeeper quorum list. + */ + public static final String KAFKA_ZOOKEEPERS = "zookeeper.connect"; + + /** + * Comma-delimited list of Kafka brokers, for producer bootstrapping purposes. + */ + public static final String KAFKA_BROKERS = "metadata.broker.list"; + + /** + * Default number of brokers in the Kafka cluster. + */ + public static final int DEFAULT_BROKERS = 1; + + /** + * Default number of partitions per Kafka topic. + */ + public static final int PARTITIONS_PER_TOPIC = 4; + + private List<KafkaConfig> brokerConfigs; + private List<KafkaServer> brokers; + private File clientConfig; + private boolean setUp; + private boolean tornDown; + + /** + * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers. + */ + public KafkaBrokerTestHarness() { + this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0]); + } + + /** + * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers and the supplied + * {@link Properties} which will be applied to the brokers. + * + * @param properties + * the additional {@link Properties} supplied to the brokers + * @throws IllegalArgumentException + * if {@code properties} is {@code null} + */ + public KafkaBrokerTestHarness(Properties properties) { + this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0], properties); + } + + /** + * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port. + * + * @param brokers Number of Kafka brokers to start up. + * @param zookeeperPort The port number to use for Zookeeper client connections. + * + * @throws IllegalArgumentException if {@code brokers} is less than 1. + */ + public KafkaBrokerTestHarness(int brokers, int zookeeperPort) { + this(getBrokerConfig(brokers, zookeeperPort), zookeeperPort); + } + + /** + * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port. + * + * @param brokers + * Number of Kafka brokers to start up. + * @param zookeeperPort + * The port number to use for Zookeeper client connections. + * @param properties + * the additional {@link Properties} supplied to the brokers + * + * @throws IllegalArgumentException + * if {@code brokers} is less than 1 or if {@code baseProperties} is {@code null} + */ + public KafkaBrokerTestHarness(int brokers, int zookeeperPort, Properties properties) { + this(getBrokerConfig(brokers, zookeeperPort, properties), zookeeperPort); + } + + /** + * Creates a new Kafka broker test harness using the given broker configuration properties and Zookeeper port. + * + * @param brokerConfigs List of Kafka broker configurations. + * @param zookeeperPort The port number to use for Zookeeper client connections. + * + * @throws IllegalArgumentException if {@code brokerConfigs} is {@code null} or empty. + */ + public KafkaBrokerTestHarness(List<KafkaConfig> brokerConfigs, int zookeeperPort) { + super(zookeeperPort); + if (brokerConfigs == null || brokerConfigs.isEmpty()) { + throw new IllegalArgumentException("Must supply at least one broker configuration."); + } + this.brokerConfigs = brokerConfigs; + this.brokers = null; + this.setUp = false; + this.tornDown = false; + } + + /** + * Start up the Kafka broker cluster. + * + * @throws IOException if an error occurs during Kafka broker startup. + * @throws IllegalStateException if the Kafka broker cluster has already been {@link #setUp() setup}. + */ + @Override + public void setUp() throws IOException { + if (setUp) { + throw new IllegalStateException("Already setup, cannot setup again"); + } + setUp = true; + + // Start up zookeeper. + super.setUp(); + + brokers = new ArrayList<KafkaServer>(brokerConfigs.size()); + for (KafkaConfig config : brokerConfigs) { + brokers.add(startBroker(config)); + } + + // Write out Kafka client config to a temp file. + clientConfig = new File(KafkaTestUtils.getTempDir(), "kafka-config.xml"); + FileWriter writer = new FileWriter(clientConfig); + writer.append("<configuration>"); + for (String prop : Arrays.asList(KAFKA_BROKERS, KAFKA_ZOOKEEPERS)) { + writer.append("<property>"); + writer.append("<name>").append(prop).append("</name>"); + writer.append("<value>").append(getProps().getProperty(prop)).append("</value>"); + writer.append("</property>"); + } + writer.append("</configuration>"); + writer.close(); + } + + /** + * Shutdown the Kafka broker cluster. Attempting to {@link #setUp()} a cluster again after calling this method is not allowed; + * a new {@code KafkaBrokerTestHarness} must be created instead. + * + * @throws IllegalStateException if the Kafka broker cluster has already been {@link #tearDown() torn down} or has not been + * {@link #setUp()}. + */ + @Override + public void tearDown() throws IOException { + if (!setUp) { + throw new IllegalStateException("Not set up, cannot tear down"); + } + if (tornDown) { + throw new IllegalStateException("Already torn down, cannot tear down again"); + } + tornDown = true; + + for (KafkaServer broker : brokers) { + broker.shutdown(); + } + + for (KafkaServer broker : brokers) { + for (String logDir : asJavaIterable(broker.config().logDirs())) { + FileUtils.deleteDirectory(new File(logDir)); + } + } + + // Shutdown zookeeper + super.tearDown(); + } + + /** + * Returns properties for a Kafka producer. + * + * @return Producer properties. + */ + public Properties getProducerProps() { + StringBuilder brokers = new StringBuilder(); + for (int i = 0; i < brokerConfigs.size(); ++i) { + KafkaConfig config = brokerConfigs.get(i); + brokers.append((i > 0) ? "," : "").append(config.hostName()).append(":").append(config.port()); + } + + Properties props = new Properties(); + props.setProperty(KAFKA_BROKERS, brokers.toString()); + props.setProperty(KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS, "10000"); + + // These two properties below are increased from their defaults to help with the case that auto.create.topics.enable is + // disabled and a test tries to create a topic and immediately write to it + props.setProperty(KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS, Integer.toString(500)); + props.setProperty(KAFKA_PRODUCER_RETRY_MAX, Integer.toString(10)); + + return props; + } + + /** + * Returns properties for a Kafka consumer. + * + * @return Consumer properties. + */ + public Properties getConsumerProps() { + Properties props = new Properties(); + props.setProperty(KAFKA_ZOOKEEPERS, zookeeperConnect); + return props; + } + + /** + * Returns properties for either a Kafka producer or consumer. + * + * @return Combined producer and consumer properties. + */ + public Properties getProps() { + // Combine producer and consumer properties. + Properties props = getProducerProps(); + props.putAll(getConsumerProps()); + return props; + } + + /** + * Returns configuration properties for each Kafka broker in the cluster. + * + * @return Broker properties. + */ + public List<Properties> getBrokerProps() { + List<Properties> props = new ArrayList<Properties>(brokers.size()); + for (KafkaServer broker : brokers) { + Properties prop = new Properties(); + prop.putAll(broker.config().props()); + props.add(prop); + } + return props; + } + + /** + * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper. + * @param brokers the number of brokers to create configuration for. + * @param zookeeperPort the zookeeper port for the brokers to connect to. + * @return configuration for a collection of brokers. + * @throws IllegalArgumentException if {@code brokers} is less than 1 + */ + public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort) { + return getBrokerConfig(brokers, zookeeperPort, new Properties()); + } + + /** + * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper. + * @param brokers the number of brokers to create configuration for. + * @param zookeeperPort the zookeeper port for the brokers to connect to. + * @param baseProperties basic properties that should be applied for each broker config. These properties will be + * honored in favor of any default properties. + * @return configuration for a collection of brokers. + * @throws IllegalArgumentException if {@code brokers} is less than 1 or {@code baseProperties} is {@code null}. + */ + public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort, Properties baseProperties) { + if (brokers < 1) { + throw new IllegalArgumentException("Invalid broker count: " + brokers); + } + if (baseProperties == null) { + throw new IllegalArgumentException("The 'baseProperties' cannot be 'null'."); + } + + int ports[] = KafkaTestUtils.getPorts(brokers); + + List<KafkaConfig> configs = new ArrayList<KafkaConfig>(brokers); + for (int i = 0; i < brokers; ++i) { + Properties props = new Properties(); + props.setProperty(KAFKA_ZOOKEEPERS, "localhost:" + zookeeperPort); + props.setProperty("broker.id", String.valueOf(i + 1)); + props.setProperty("host.name", "localhost"); + props.setProperty("port", String.valueOf(ports[i])); + props.setProperty("log.dir", KafkaTestUtils.getTempDir().getAbsolutePath()); + props.setProperty("log.flush.interval.messages", String.valueOf(1)); + props.setProperty("num.partitions", String.valueOf(PARTITIONS_PER_TOPIC)); + props.setProperty("default.replication.factor", String.valueOf(brokers)); + props.setProperty("auto.create.topics.enable", Boolean.FALSE.toString()); + + props.putAll(baseProperties); + + configs.add(new KafkaConfig(props)); + } + return configs; + } + + /** + * Returns location of Kafka client configuration file containing broker and zookeeper connection properties. + * <p> + * This file can be loaded using the {@code -conf} command option to easily achieve Kafka connectivity. + * </p> + * + * @return Kafka client configuration file path + */ + public String getClientConfigPath() { + return clientConfig.getAbsolutePath(); + } + + private static KafkaServer startBroker(KafkaConfig config) { + KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty()); + server.startup(); + return server; + } + + private static class SystemTime implements Time { + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + @Override + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // Ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java new file mode 100644 index 0000000..f8eb2ff --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.utils; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +/** + * Assorted Kafka testing utility methods. + */ +public class KafkaTestUtils { + + private static final Random RANDOM = new Random(); + private static final String TEMP_DIR_PREFIX = "kafka-"; + + private static final Set<Integer> USED_PORTS = new HashSet<Integer>(); + + /** + * Creates and returns a new randomly named temporary directory. It will be deleted upon JVM exit. + * + * @return a new temporary directory. + * + * @throws RuntimeException if a new temporary directory could not be created. + */ + public static File getTempDir() { + File file = new File(System.getProperty("java.io.tmpdir"), TEMP_DIR_PREFIX + RANDOM.nextInt(10000000)); + if (!file.mkdirs()) { + throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath()); + } + file.deleteOnExit(); + return file; + } + + /** + * Returns an array containing the specified number of available local ports. + * + * @param count Number of local ports to identify and return. + * + * @return an array of available local port numbers. + * + * @throws RuntimeException if an I/O error occurs opening or closing a socket. + */ + public static int[] getPorts(int count) { + int[] ports = new int[count]; + Set<ServerSocket> openSockets = new HashSet<ServerSocket>(count + USED_PORTS.size()); + + for (int i = 0; i < count; ) { + try { + ServerSocket socket = new ServerSocket(0); + int port = socket.getLocalPort(); + openSockets.add(socket); + + // Disallow port reuse. + if (!USED_PORTS.contains(port)) { + ports[i++] = port; + USED_PORTS.add(port); + } + } catch (IOException e) { + throw new RuntimeException("could not open socket", e); + } + } + + // Close the sockets so that their port numbers can be used by the caller. + for (ServerSocket socket : openSockets) { + try { + socket.close(); + } catch (IOException e) { + throw new RuntimeException("could not close socket", e); + } + } + + return ports; + } +} + http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java new file mode 100644 index 0000000..6ee102e --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.kafka.utils; + +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; + +import java.nio.charset.Charset; + +/** + * A {@link ZkSerializer Zookeeper serializer} for {@link String} objects. + * <p> + * Ported from the {@code kafka.utils.ZKStringSerializer} scala object. + * </p> + */ +public class ZkStringSerializer implements ZkSerializer { + + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + return ((String) data).getBytes(UTF_8); + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + return bytes != null ? new String(bytes, UTF_8) : null; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java new file mode 100644 index 0000000..c4a7e15 --- /dev/null +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.kafka.utils; + +import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import java.io.IOException; + +/** + * A test harness that brings up an embedded Zookeeper instance. + * <p> + * Adapted from the {@code kafka.zk.ZooKeeperTestHarness} class. + * </p> + */ +public class ZookeeperTestHarness { + + /** + * Zookeeper connection info. + */ + protected final String zookeeperConnect; + + private EmbeddedZookeeper zookeeper; + private final int zkConnectionTimeout; + private final int zkSessionTimeout; + + /** + * Zookeeper client connection. + */ + protected ZkUtils zkUtils; + + /** + * Creates a new Zookeeper broker test harness. + */ + public ZookeeperTestHarness() { + this(KafkaTestUtils.getPorts(1)[0]); + } + + /** + * Creates a new Zookeeper service test harness using the given port. + * + * @param zookeeperPort The port number to use for Zookeeper client connections. + */ + public ZookeeperTestHarness(int zookeeperPort) { + this.zookeeper = null; + this.zkUtils = null; + this.zkConnectionTimeout = 6000; + this.zkSessionTimeout = 6000; + this.zookeeperConnect = "localhost:" + zookeeperPort; + } + + /** + * Returns a client for communicating with the Zookeeper service. + * + * @return A Zookeeper client. + * + * @throws IllegalStateException + * if Zookeeper has not yet been {@link #setUp()}, or has already been {@link #tearDown() torn down}. + */ + public ZkClient getZkClient() { + if (zkUtils == null) { + throw new IllegalStateException("Zookeeper service is not active"); + } + return zkUtils.zkClient(); + } + + public ZkUtils getZkUtils() { + return zkUtils; + } + + /** + * Startup Zookeeper. + * + * @throws IOException if an error occurs during Zookeeper initialization. + */ + public void setUp() throws IOException { + zookeeper = new EmbeddedZookeeper(zookeeperConnect); + ZkClient zkClient = new ZkClient(zookeeperConnect, zkSessionTimeout, zkConnectionTimeout, new ZkStringSerializer()); + ZkConnection connection = new ZkConnection(zookeeperConnect, zkSessionTimeout); + zkUtils = new ZkUtils(zkClient, connection, false); + } + + /** + * Shutdown Zookeeper. + */ + public void tearDown() throws IOException { + if (zkUtils != null) { + zkUtils.close(); + zkUtils = null; + } + if (zookeeper != null) { + zookeeper.shutdown(); + zookeeper = null; + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/resources/log4j.properties b/crunch-kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..0b3eeee --- /dev/null +++ b/crunch-kafka/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, A1 +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout + +# Print the date in ISO 8601 format +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +# Limit Apache logging to keep us from being overwhelmed as our tests stop and restart servers. +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.apache.zookeeper.client.ZooKeeperSaslClient=ERROR +log4j.logger.org.apache.hadoop.conf.Configuration=ERROR \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5b390ee..78ea085 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ under the License. <module>crunch-spark</module> <module>crunch-hive</module> <module>crunch-dist</module> + <module>crunch-kafka</module> </modules> <profiles> <profile> @@ -103,6 +104,7 @@ under the License. <hbase.version>1.0.0</hbase.version> <avro.classifier>hadoop2</avro.classifier> + <kafka.version>0.9.0.1</kafka.version> <scala.base.version>2.10</scala.base.version> <scala.version>2.10.4</scala.version> <scalatest.version>2.2.4</scalatest.version> @@ -455,6 +457,17 @@ under the License. </dependency> <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.base.version}</artifactId> + <version>${kafka.version}</version> + </dependency> + + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>${jsr305.version}</version>
