Repository: crunch Updated Branches: refs/heads/master e929e0444 -> fbda02f46
CRUNCH-620: Reduce "isn't a known config" warnings by slimming down ConsumerConfig properties Resolved by tagging the Kafka connection properties so that the Kafka Consumers can be built with slimmer ConsumerConfig properties. Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fbda02f4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fbda02f4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fbda02f4 Branch: refs/heads/master Commit: fbda02f46961c17b3f444424b166fbf65262711c Parents: e929e04 Author: Stefan Mendoza <[email protected]> Authored: Mon Sep 12 22:38:41 2016 -0500 Committer: Micah Whitacre <[email protected]> Committed: Mon Oct 24 21:15:23 2016 -0500 ---------------------------------------------------------------------- .../org/apache/crunch/kafka/KafkaSource.java | 15 ++- .../kafka/inputformat/KafkaInputFormat.java | 108 ++++++++++++++++++- .../kafka/inputformat/KafkaRecordReader.java | 15 +-- .../org/apache/crunch/kafka/ClusterTest.java | 4 +- .../org/apache/crunch/kafka/KafkaSourceIT.java | 8 +- .../kafka/inputformat/KafkaInputFormatIT.java | 69 ++++++++++-- 6 files changed, 189 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java index 485604d..ba7788b 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java @@ -148,23 +148,20 @@ public class KafkaSource FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class); KafkaInputFormat.writeOffsetsToBundle(offsets, bundle); - - for (String name : kafkaConnectionProperties.stringPropertyNames()) { - bundle.set(name, kafkaConnectionProperties.getProperty(name)); - } + KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, bundle); return bundle; } - private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) { + private static <K, V> Properties copyAndSetProperties(Properties kafkaConnectionProperties) { Properties props = new Properties(); - props.putAll(kakfaConnectionProperties); + props.putAll(kafkaConnectionProperties); //Setting the key/value deserializer to ensure proper translation from Kafka to PType format. props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName()); - return props; + return KafkaInputFormat.tagExistingKafkaConnectionProperties(props); } @@ -173,8 +170,8 @@ public class KafkaSource // consumer will get closed when the iterable is fully consumed. // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity // of parallelism when reading. - Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props); - return new KafkaRecordsIterable<>(consumer, offsets, props); + Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(KafkaInputFormat.filterConnectionProperties(props)); + return new KafkaRecordsIterable<>(consumer, offsets, KafkaInputFormat.filterConnectionProperties(props)); } http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java index eba4a97..0dadf97 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java @@ -36,17 +36,26 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; /** - * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped - * inside of a {@link BytesWritable} instance. + * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped inside of a + * {@link BytesWritable} instance. * * Populating the configuration of the input format is handled with the convenience method of * {@link #writeOffsetsToConfiguration(Map, Configuration)}. This should be done to ensure * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits} * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}. + * + * To suppress warnings generated by unused configs in the {@link org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig}, + * one can use {@link #tagExistingKafkaConnectionProperties(Properties) tagExistingKafkaConnectionProperties} and + * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} to prefix Kafka connection properties with + * "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using {@link #getConnectionPropertyFromKey(String) + * getConnectionPropertyFromKey} and {@link #filterConnectionProperties(Properties) filterConnectionProperties}. */ + public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable { /** @@ -74,6 +83,17 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> */ private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$"; + /** + * Constant for constructing configuration keys for the Kafka connection properties. + */ + private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties"; + + /** + * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig. + */ + private static final Pattern CONNECTION_PROPERTY_REGEX = + Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$"); + private Configuration configuration; @Override @@ -123,6 +143,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> /** * Writes the start and end offsets for the provided topic partitions to the {@code bundle}. + * * @param offsets The starting and ending offsets for the topics and partitions. * @param bundle the bundle into which the information should be persisted. */ @@ -134,6 +155,7 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> /** * Writes the start and end offsets for the provided topic partitions to the {@code config}. + * * @param offsets The starting and ending offsets for the topics and partitions. * @param config the config into which the information should be persisted. */ @@ -232,4 +254,86 @@ public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> return value; } + + // The following methods are convenience methods for dealing with Kafka connection properties. This includes: + // - writing Kafka connection properties to a FormatBundle + // - generating tagged Kafka connection properties using the prefix "org.apache.crunch.kafka.connection.properties" + // - retrieving Kafka connection properties prefixed by "org.apache.crunch.kafka.connection.properties" + // - filtering out Kafka connection properties from a Properties object + // - tagging all properties in a Properties object with the Kafka connection properties prefix + // The tagging of the Kafka connection properties allows for suppression of "isn't a known config" ConsumerConfig warnings that + // are generated by unused properties carried over from a Hadoop configuration. + + /** + * Writes the Kafka connection properties to the {@code bundle}. + * + * @param connectionProperties the Kafka connection properties + * @param bundle the bundle into which the information should be persisted. + */ + public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) { + for (final String name : connectionProperties.stringPropertyNames()) { + bundle.set(name, connectionProperties.getProperty(name)); + } + } + + /** + * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with + * {@link #filterConnectionProperties(Properties) filterConnectionProperties}. + * + * @param property the Kafka connection property that will be prefixed for retrieval at a later time. + * @return the property prefixed "org.apache.crunch.kafka.connection.properties" + */ + static String generateConnectionPropertyKey(String property) { + return KAFKA_CONNECTION_PROPERTY_BASE + "." + property; + } + + /** + * + * Retrieves the original property that was tagged using {@link #generateConnectionPropertyKey(String) + * generateConnectionPropertyKey}. + * + * @param key the key that was tagged using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}. + * @return the original property prior to tagging. + */ + static String getConnectionPropertyFromKey(String key) { + // Strip off the base key + a trailing "." + return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1); + } + + /** + * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every + * property prefixed with "org.apache.crunch.kafka.connection.properties". + * + * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties" + * @return a {@link Properties} object representing Kafka connection properties + */ + public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) { + Properties taggedProperties = new Properties(); + + for (final String name : connectionProperties.stringPropertyNames()) { + taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name)); + } + + return taggedProperties; + } + + /** + * Filters out Kafka connection properties that were tagged using {@link #generateConnectionPropertyKey(String) + * generateConnectionPropertyKey}. + * + * @param props the properties to be filtered. + * @return the properties containing Kafka connection information that were tagged using + * {@link #generateConnectionPropertyKey(String)}. + */ + public static Properties filterConnectionProperties(Properties props) { + Properties filteredProperties = new Properties(); + + for (final String name : props.stringPropertyNames()) { + if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) { + filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name)); + } + } + + return filteredProperties; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java index 14c8030..3ed799b 100644 --- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java +++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java @@ -46,6 +46,7 @@ import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY; import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT; import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties; +import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties; /** * A {@link RecordReader} for pulling data from Kafka. @@ -75,14 +76,15 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { if(!(inputSplit instanceof KafkaInputSplit)){ throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type."); } - KafkaInputSplit split = (KafkaInputSplit) inputSplit; - topicPartition = split.getTopicPartition(); - - connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration()); + Properties kafkaConnectionProperties = filterConnectionProperties( + getKafkaConnectionProperties(taskAttemptContext.getConfiguration())); - consumer = new KafkaConsumer<>(connectionProperties); + consumer = new KafkaConsumer<>(kafkaConnectionProperties); + KafkaInputSplit split = (KafkaInputSplit) inputSplit; + TopicPartition topicPartition = split.getTopicPartition(); consumer.assign(Collections.singletonList(topicPartition)); + //suggested hack to gather info without gathering data consumer.poll(0); //now seek to the desired start location @@ -119,8 +121,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> { } return true; } else { - LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, - endingOffset); + LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset); } } record = null; http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java index 836039c..38ded40 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java @@ -24,6 +24,7 @@ import kafka.serializer.Decoder; import kafka.serializer.Encoder; import kafka.utils.VerifiableProperties; import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.kafka.inputformat.KafkaInputFormat; import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT; import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT; import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness; @@ -135,7 +136,8 @@ public class ClusterTest { public static Configuration getConsumerConfig() { Configuration kafkaConfig = new Configuration(conf); - KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig); + KafkaUtils.addKafkaConnectionProperties(KafkaInputFormat.tagExistingKafkaConnectionProperties( + getConsumerProperties()), kafkaConfig); return kafkaConfig; } http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java index 3800c24..7f1323e 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java @@ -48,7 +48,7 @@ import java.util.Properties; import java.util.Set; import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets; -import static org.hamcrest.CoreMatchers.notNullValue; +import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.matchers.JUnitMatchers.hasItem; @@ -105,8 +105,10 @@ public class KafkaSourceIT { Set<String> keysRead = new HashSet<>(); int numRecordsFound = 0; + String currentKey; for (Pair<BytesWritable, BytesWritable> values : read.materialize()) { - assertThat(keys, hasItem(new String(values.first().getBytes()))); + currentKey = new String(values.first().getBytes()); + assertThat(keys, hasItem(currentKey)); numRecordsFound++; keysRead.add(new String(values.first().getBytes())); } @@ -166,4 +168,4 @@ public class KafkaSourceIT { return new String(input.first().getBytes()); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/fbda02f4/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java ---------------------------------------------------------------------- diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java index d760a02..3e7ab6f 100644 --- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java +++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java @@ -89,13 +89,17 @@ public class KafkaInputFormatIT { topic = testName.getMethodName(); consumerProps = ClusterTest.getConsumerProperties(); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), + KafkaSource.BytesDeserializer.class.getName()); + consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), + KafkaSource.BytesDeserializer.class.getName()); config = ClusterTest.getConsumerConfig(); - config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); - config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName()); + config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), + KafkaSource.BytesDeserializer.class.getName()); + config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), + KafkaSource.BytesDeserializer.class.getName()); } @Test @@ -183,9 +187,11 @@ public class KafkaInputFormatIT { recordReader.initialize(split, taskContext); int numRecordsFound = 0; + String currentKey; while (recordReader.nextKeyValue()) { - keysRead.add(new String(recordReader.getCurrentKey().getBytes())); - assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes()))); + currentKey = new String(recordReader.getCurrentKey().getBytes()); + keysRead.add(currentKey); + assertThat(keys, hasItem(currentKey)); assertThat(recordReader.getCurrentValue(), is(notNullValue())); numRecordsFound++; } @@ -354,6 +360,54 @@ public class KafkaInputFormatIT { } } + @Test + public void generateConnectionPropertyKey() { + String propertyName = "some.property"; + String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName); + String expected = "org.apache.crunch.kafka.connection.properties.some.property"; + assertThat(expected, is(actual)); + } + + @Test + public void getConnectionPropertyFromKey() { + String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property"; + String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty); + String expected = "some.property"; + assertThat(expected, is(actual)); + } + + @Test + public void writeConnectionPropertiesToBundle() { + FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class); + Properties connectionProperties = new Properties(); + connectionProperties.put("key1", "value1"); + connectionProperties.put("key2", "value2"); + KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual); + + FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class); + expected.set("key1", "value1"); + expected.set("key2", "value2"); + + assertThat(expected, is(actual)); + } + + @Test + public void filterConnectionProperties() { + Properties props = new Properties(); + props.put("org.apache.crunch.kafka.connection.properties.key1", "value1"); + props.put("org.apache.crunch.kafka.connection.properties.key2", "value2"); + props.put("org_apache_crunch_kafka_connection_properties.key3", "value3"); + props.put("org.apache.crunch.another.prefix.properties.key4", "value4"); + + Properties actual = KafkaInputFormat.filterConnectionProperties(props); + Properties expected = new Properties(); + expected.put("key1", "value1"); + expected.put("key2", "value2"); + + assertThat(expected, is(actual)); + } + + @Test(expected=IllegalStateException.class) public void getOffsetsFromConfigMissingStart() { Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>(); @@ -403,5 +457,4 @@ public class KafkaInputFormatIT { Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config); } - -} +} \ No newline at end of file
