Repository: flume Updated Branches: refs/heads/trunk ed9f6ff6f -> fa1ee05af
FLUME-3027. Change Kafka Channel to clear offsets map after commit This change adds a call to clear the offsets map after a commit so as to avoid repeatedly committing already-committed offsets. Also updates various debug and trace log messages/calls to help with troubleshooting. This closes #92 Reviewers: Attila Simon, Bessenyei Balázs Donát (Jeff Holoman via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/fa1ee05a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/fa1ee05a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/fa1ee05a Branch: refs/heads/trunk Commit: fa1ee05af38bcf08ed18ff36d4284e68836a9054 Parents: ed9f6ff Author: Jeff Holoman <[email protected]> Authored: Wed Nov 23 10:52:06 2016 -0500 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Tue Dec 6 07:28:28 2016 +0000 ---------------------------------------------------------------------- .../flume/channel/kafka/KafkaChannel.java | 129 ++++++++++++------- 1 file changed, 80 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/fa1ee05a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index cc7bb48..6684bea 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.security.JaasUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,9 +103,9 @@ public class KafkaChannel extends BasicChannelSemantics { private Integer staticPartitionId; private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; - //used to indicate if a rebalance has occurred during the current transaction + // used to indicate if a rebalance has occurred during the current transaction AtomicBoolean rebalanceFlag = new AtomicBoolean(); - //This isn't a Kafka property per se, but we allow it to be configurable + // This isn't a Kafka property per se, but we allow it to be configurable private long pollTimeout = DEFAULT_POLL_TIMEOUT; @@ -154,8 +155,7 @@ public class KafkaChannel extends BasicChannelSemantics { producer.close(); counter.stop(); super.stop(); - logger.info("Kafka channel {} stopped. Metrics: {}", getName(), - counter); + logger.info("Kafka channel {} stopped.", getName()); } @Override @@ -166,7 +166,7 @@ public class KafkaChannel extends BasicChannelSemantics { @Override public void configure(Context ctx) { - //Can remove in the next release + // Can remove in the next release translateOldProps(ctx); topicStr = ctx.getString(TOPIC_CONFIG); @@ -217,7 +217,7 @@ public class KafkaChannel extends BasicChannelSemantics { logger.warn("{} is deprecated. Please use the parameter {}", "topic", TOPIC_CONFIG); } - //Broker List + // Broker List // If there is no value we need to check and set the old param and log a warning message if (!(ctx.containsKey(BOOTSTRAP_SERVERS_CONFIG))) { String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY); @@ -230,7 +230,7 @@ public class KafkaChannel extends BasicChannelSemantics { } } - //GroupId + // GroupId // If there is an old Group Id set, then use that if no groupId is set. if (!(ctx.containsKey(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG))) { String oldGroupId = ctx.getString(GROUP_ID_FLUME); @@ -265,7 +265,7 @@ public class KafkaChannel extends BasicChannelSemantics { producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); - //Defaults overridden based on config + // Defaults overridden based on config producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); } @@ -279,9 +279,9 @@ public class KafkaChannel extends BasicChannelSemantics { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET); - //Defaults overridden based on config + // Defaults overridden based on config consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX)); - //These always take precedence over config + // These always take precedence over config consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); @@ -313,8 +313,7 @@ public class KafkaChannel extends BasicChannelSemantics { try { Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer); if (!kafkaOffsets.isEmpty()) { - logger.info("Found Kafka offsets for topic " + topicStr + - ". Will not migrate from zookeeper"); + logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", topicStr); logger.debug("Offsets found: {}", kafkaOffsets); return; } @@ -373,6 +372,7 @@ public class KafkaChannel extends BasicChannelSemantics { } private void decommissionConsumerAndRecords(ConsumerAndRecords c) { + c.consumer.wakeup(); c.consumer.close(); } @@ -434,7 +434,7 @@ public class KafkaChannel extends BasicChannelSemantics { if (staticPartitionId != null) { partitionId = staticPartitionId; } - //Allow a specified header to override a static ID + // Allow a specified header to override a static ID if (partitionHeader != null) { String headerVal = event.getHeaders().get(partitionHeader); if (headerVal != null) { @@ -460,6 +460,7 @@ public class KafkaChannel extends BasicChannelSemantics { @SuppressWarnings("unchecked") @Override protected Event doTake() throws InterruptedException { + logger.trace("Starting event take"); type = TransactionType.TAKE; try { if (!(consumerAndRecords.get().uuid.equals(channelUUID))) { @@ -482,11 +483,10 @@ public class KafkaChannel extends BasicChannelSemantics { if (!consumerAndRecords.get().failedEvents.isEmpty()) { e = consumerAndRecords.get().failedEvents.removeFirst(); } else { - - if (logger.isDebugEnabled()) { - logger.debug("Assigment: {}", consumerAndRecords.get().consumer.assignment().toString()); + if ( logger.isTraceEnabled() ) { + logger.trace("Assignment during take: {}", + consumerAndRecords.get().consumer.assignment().toString()); } - try { long startTime = System.nanoTime(); if (!consumerAndRecords.get().recordIterator.hasNext()) { @@ -497,24 +497,20 @@ public class KafkaChannel extends BasicChannelSemantics { e = deserializeValue(record.value(), parseAsFlumeEvent); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1, batchUUID); - consumerAndRecords.get().offsets.put(tp, oam); - - if (logger.isTraceEnabled()) { - logger.trace("Took offset: {}", consumerAndRecords.get().offsets.toString()); - } + consumerAndRecords.get().saveOffsets(tp,oam); //Add the key to the header if (record.key() != null) { e.getHeaders().put(KEY_HEADER, record.key()); } - if (logger.isDebugEnabled()) { - logger.debug("Processed output from partition {} offset {}", - record.partition(), record.offset()); - } - long endTime = System.nanoTime(); counter.addToKafkaEventGetTimer((endTime - startTime) / (1000 * 1000)); + + if (logger.isDebugEnabled()) { + logger.debug("{} processed output from partition {} offset {}", + new Object[] {getName(), record.partition(), record.offset()}); + } } else { return null; } @@ -532,6 +528,7 @@ public class KafkaChannel extends BasicChannelSemantics { @Override protected void doCommit() throws InterruptedException { + logger.trace("Starting commit"); if (type.equals(TransactionType.NONE)) { return; } @@ -564,15 +561,24 @@ public class KafkaChannel extends BasicChannelSemantics { ex); } } else { + // event taken ensures that we have collected events in this transaction + // before committing if (consumerAndRecords.get().failedEvents.isEmpty() && eventTaken) { + logger.trace("About to commit batch"); long startTime = System.nanoTime(); consumerAndRecords.get().commitOffsets(); long endTime = System.nanoTime(); counter.addToKafkaCommitTimer((endTime - startTime) / (1000 * 1000)); - consumerAndRecords.get().printCurrentAssignment(); + if (logger.isDebugEnabled()) { + logger.debug(consumerAndRecords.get().getCommittedOffsetsString()); + } + } + + int takes = events.get().size(); + if (takes > 0) { + counter.addToEventTakeSuccessCount(takes); + events.get().clear(); } - counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size())); - events.get().clear(); } } @@ -585,7 +591,7 @@ public class KafkaChannel extends BasicChannelSemantics { producerRecords.get().clear(); kafkaFutures.get().clear(); } else { - counter.addToRollbackCounter(Long.valueOf(events.get().size())); + counter.addToRollbackCounter(events.get().size()); consumerAndRecords.get().failedEvents.addAll(events.get()); events.get().clear(); } @@ -676,34 +682,59 @@ public class KafkaChannel extends BasicChannelSemantics { this.recordIterator = records.iterator(); } - void poll() { - this.records = consumer.poll(pollTimeout); - this.recordIterator = records.iterator(); - logger.trace("polling"); + private void poll() { + logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName()); + try { + records = consumer.poll(pollTimeout); + recordIterator = records.iterator(); + logger.debug("{} returned {} records from last poll", getName(), records.count()); + } catch (WakeupException e) { + logger.trace("Consumer woken up for channel {}.", getName()); + } } - void commitOffsets() { - this.consumer.commitSync(offsets); + private void commitOffsets() { + try { + consumer.commitSync(offsets); + } catch (Exception e) { + logger.info("Error committing offsets.", e); + } finally { + logger.trace("About to clear offsets map."); + offsets.clear(); + } } - // This will reset the latest assigned partitions to the last committed offsets; + private String getOffsetMapString() { + StringBuilder sb = new StringBuilder(); + sb.append(getName()).append(" current offsets map: "); + for (TopicPartition tp : offsets.keySet()) { + sb.append("p").append(tp.partition()).append("-") + .append(offsets.get(tp).offset()).append(" "); + } + return sb.toString(); + } - public void printCurrentAssignment() { + // This prints the current committed offsets when debug is enabled + private String getCommittedOffsetsString() { StringBuilder sb = new StringBuilder(); - for (TopicPartition tp : this.consumer.assignment()) { + sb.append(getName()).append(" committed: "); + for (TopicPartition tp : consumer.assignment()) { try { - sb.append("Committed: [").append(tp).append(",") - .append(this.consumer.committed(tp).offset()) - .append(",").append(this.consumer.committed(tp).metadata()).append("]"); - if (logger.isDebugEnabled()) { - logger.debug(sb.toString()); - } + sb.append("[").append(tp).append(",") + .append(consumer.committed(tp).offset()) + .append("] "); } catch (NullPointerException npe) { - if (logger.isDebugEnabled()) { - logger.debug("Committed {}", tp); - } + logger.debug("Committed {}", tp); } } + return sb.toString(); + } + + private void saveOffsets(TopicPartition tp, OffsetAndMetadata oam) { + offsets.put(tp,oam); + if (logger.isTraceEnabled()) { + logger.trace(getOffsetMapString()); + } } } }
