Repository: kafka Updated Branches: refs/heads/0.8.2 17c8bdcbb -> 988e695fa
KAFKA-1723; (followup patch to fix javadoc for java 8) make the metrics name in new producer more standard; patched by Manikumar Reddy; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/988e695f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/988e695f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/988e695f Branch: refs/heads/0.8.2 Commit: 988e695fa294c5e9799509f6313b95e1f96682da Parents: 17c8bdc Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Fri Jan 16 18:09:42 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Fri Jan 16 18:09:42 2015 -0800 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 34 ++++++++++---------- .../kafka/clients/producer/MockProducer.java | 8 ++--- .../org/apache/kafka/common/MetricName.java | 24 ++++++++------ 3 files changed, 36 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 0bfda4b..30477d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -264,33 +264,33 @@ public class KafkaProducer<K,V> implements Producer<K,V> { * sending the record. * <p> * If you want to simulate a simple blocking call you can do the following: - * - * <pre> - * producer.send(new ProducerRecord("the-topic", "key, "value")).get(); - * </pre> + * + * <pre>{@code + * producer.send(new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes())).get(); + * }</pre> * <p> * Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that * will be invoked when the request is complete. - * - * <pre> - * ProducerRecord record = new ProducerRecord("the-topic", "key, "value"); + * + * <pre>{@code + * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", "key".getBytes(), "value".getBytes()); * producer.send(myRecord, - * new Callback() { + * new Callback() { * public void onCompletion(RecordMetadata metadata, Exception e) { * if(e != null) * e.printStackTrace(); * System.out.println("The offset of the record we just sent is: " + metadata.offset()); * } - * }); - * </pre> - * + * }); + * }</pre> + * * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>: - * - * <pre> - * producer.send(new ProducerRecord(topic, partition, key, value), callback1); - * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2); - * </pre> + * + * <pre>{@code + * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); + * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2); + * }</pre> * <p> * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or * they will delay the sending of messages from other threads. If you want to execute blocking or computationally @@ -329,7 +329,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } - ProducerRecord serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); + ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); int partition = partitioner.partition(serializedRecord, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 21c25a9..cdca682 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -41,7 +41,7 @@ public class MockProducer implements Producer<byte[],byte[]> { private final Cluster cluster; private final Partitioner partitioner = new Partitioner(); - private final List<ProducerRecord> sent; + private final List<ProducerRecord<byte[], byte[]>> sent; private final Deque<Completion> completions; private boolean autoComplete; private Map<TopicPartition, Long> offsets; @@ -59,7 +59,7 @@ public class MockProducer implements Producer<byte[],byte[]> { this.cluster = cluster; this.autoComplete = autoComplete; this.offsets = new HashMap<TopicPartition, Long>(); - this.sent = new ArrayList<ProducerRecord>(); + this.sent = new ArrayList<ProducerRecord<byte[], byte[]>>(); this.completions = new ArrayDeque<Completion>(); } @@ -144,8 +144,8 @@ public class MockProducer implements Producer<byte[],byte[]> { /** * Get the list of sent records since the last call to {@link #clear()} */ - public synchronized List<ProducerRecord> history() { - return new ArrayList<ProducerRecord>(this.sent); + public synchronized List<ProducerRecord<byte[], byte[]>> history() { + return new ArrayList<ProducerRecord<byte[], byte[]>>(this.sent); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/988e695f/clients/src/main/java/org/apache/kafka/common/MetricName.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java index 4e810d5..7e977e9 100644 --- a/clients/src/main/java/org/apache/kafka/common/MetricName.java +++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java @@ -19,34 +19,40 @@ import org.apache.kafka.common.utils.Utils; /** * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes - * <p/> + * <p> * This class captures the following parameters * <pre> * <b>name</b> The name of the metric * <b>group</b> logical group name of the metrics to which this metric belongs. * <b>description</b> A human-readable description to include in the metric. This is optional. * <b>tags</b> additional key/value attributes of the metric. This is optional. - * </pre> + * </pre> * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting. - * + * <p> * Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b> - * + * <p> * Usage looks something like this: - * <pre> + * <pre>{@code * // set up metrics: * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors - * Sensor sensor = metrics.sensor("message-sizes"); + * Sensor sensor = metrics.sensor("message-sizes"); + * * Map<String, String> metricTags = new LinkedHashMap<String, String>(); * metricTags.put("client-id", "producer-1"); * metricTags.put("topic", "topic"); - * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags); + * + * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags); * sensor.add(metricName, new Avg()); - * metricName = new MetricName("message-size-max", "producer-metrics",metricTags); + * + * metricName = new MetricName("message-size-max", "producer-metrics", metricTags); * sensor.add(metricName, new Max()); * + * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic"); + * sensor.add(metricName, new Min()); + * * // as messages are sent we record the sizes * sensor.record(messageSize); - * </pre> + * }</pre> */ public final class MetricName {