This is an automated email from the ASF dual-hosted git repository. alopresto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 698c48f NIFI-6207: fixing 'partition' typos in AWS Kinesis processor and test as well as Kafka ConsumeLease classes. 698c48f is described below commit 698c48fc60b34e14db253e2bbad556f85ea170b9 Author: Rahul Patil <rpa...@yahoo-corp.jp> AuthorDate: Mon Apr 15 12:32:36 2019 +0900 NIFI-6207: fixing 'partition' typos in AWS Kinesis processor and test as well as Kafka ConsumeLease classes. Signed-off-by: Andy LoPresto <alopre...@apache.org> --- .../apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java | 4 ++-- .../apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java | 2 +- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++-- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++-- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++-- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++-- .../java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java index 13aedfe..8d2d1a7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -83,7 +83,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE)); /** A random number generator for cases where partition key is not available */ - protected Random randomParitionKeyGenerator = new Random(); + protected Random randomPartitionKeyGenerator = new Random(); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -125,7 +125,7 @@ public class PutKinesisStream extends AbstractKinesisStreamProcessor { if (StringUtils.isBlank(partitionKey) == false) { record.setPartitionKey(partitionKey); } else { - record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt())); + record.setPartitionKey(Integer.toString(randomPartitionKeyGenerator.nextInt())); } if (recordHash.containsKey(streamName) == false) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java index b195423..0aa87c5 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java @@ -92,7 +92,7 @@ public class ITPutKinesisStream { @Test public void testIntegrationWithDynamicPartitionSuccess() throws Exception { - runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${parition}"); + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${partition}"); runner.assertValid(); Map<String,String> properties = new HashMap<>(); properties.put("partition", "px"); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 85c85f6..bde07a6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -133,7 +133,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition commit(); } @@ -147,7 +147,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index c61c549..a871097 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -142,7 +142,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition commit(); } @@ -156,7 +156,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 99ef239..6d4c6d0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -110,7 +110,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion commit(); } @@ -124,7 +124,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index f6b0d94..07831bb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -142,7 +142,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition commit(); } @@ -156,7 +156,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 2a90219..6b4dc41 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -142,7 +142,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition commit(); } @@ -156,7 +156,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe */ @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { - logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); } /**