Repository: apex-malhar Updated Branches: refs/heads/master c4a11299b -> 85566a38f
APEXMALHAR-2133 #resolve #comment Handle case partitionsFor() returns null Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/273b2072 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/273b2072 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/273b2072 Branch: refs/heads/master Commit: 273b20724a796786d52235e6535303f86b35800f Parents: 32840a2 Author: brightchen <[email protected]> Authored: Tue Jul 5 13:39:06 2016 -0700 Committer: brightchen <[email protected]> Committed: Wed Jul 6 16:50:48 2016 -0700 ---------------------------------------------------------------------- .../malhar/kafka/AbstractKafkaPartitioner.java | 60 +++++++++++++------- 1 file changed, 38 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/273b2072/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index c6e47e9..772399d 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -93,39 +93,46 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>(); - for (int i = 0; i < clusters.length; i++) { - metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>()); - for (String topic : topics) { - int tryTime = 3; - while (tryTime-- > 0) { - try { - List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); - if (logger.isDebugEnabled()) { - logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); + try { + for (int i = 0; i < clusters.length; i++) { + metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>()); + for (String topic : topics) { + //try several time if partitionsFor(topic) returns null or throws exception. + //partitionsFor(topic) will return null if the topic is invalid or hasn't completed + int tryTime = 10; + while (tryTime-- > 0) { + try { + List<PartitionInfo> ptis = metadataRefreshClients.get(i).partitionsFor(topic); + if (ptis != null) { + if (logger.isDebugEnabled()) { + logger.debug("Partition metadata for topic {} : {}", topic, Joiner.on(';').join(ptis)); + } + metadata.get(clusters[i]).put(topic, ptis); + break; + } + + logger.warn("Partition metadata for topic {} is null. retrying...", topic); + + } catch (Exception e) { + logger.warn("Got Exception when trying get partition info for topic {}.", topic, e); } - metadata.get(clusters[i]).put(topic, ptis); - break; - } catch (AuthorizationException ae) { - logger.error("Kafka AuthorizationException."); - throw new RuntimeException("Kafka AuthorizationException.", ae); - } catch (Exception e) { - logger.warn("Got Exception when trying get partition info for topic {}.", topic, e); + try { Thread.sleep(100); } catch (Exception e1) { //ignore } + } //end while + + if (tryTime == 0) { + throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic); } } - if (tryTime == 0) { - throw new RuntimeException("Get partition info completely failed. Please check the log file"); - } } - metadataRefreshClients.get(i).close(); + } finally { + closeClients(); } - metadataRefreshClients = null; - List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null; try { parts = assign(metadata); @@ -169,6 +176,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa } } + protected void closeClients() + { + for (KafkaConsumer<byte[], byte[]> consume : metadataRefreshClients) { + consume.close(); + } + metadataRefreshClients = null; + } + + @Override public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) {
