Repository: nifi
Updated Branches:
  refs/heads/master 1fc1d38fd -> c138987bb


NIFI-4656, NIFI-4680: This closes #2330. Fix error handling in consume/publish 
kafka processors. Address issue with HortonworksSchemaRegistry throwing 
RuntimeException when it should be IOException. Fixed bug in 
ConsumeerLease/ConsumKafkaRecord that caused it to report too many records 
received

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c138987b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c138987b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c138987b

Branch: refs/heads/master
Commit: c138987bb46b975379daa00e57c42f498b6ef207
Parents: 1fc1d38
Author: Mark Payne <[email protected]>
Authored: Fri Dec 8 11:20:32 2017 -0500
Committer: joewitt <[email protected]>
Committed: Fri Dec 8 16:01:14 2017 -0500

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  |  56 +++++----
 .../kafka/pubsub/PublishKafkaRecord_0_11.java   |   7 +-
 .../processors/kafka/pubsub/ConsumerLease.java  |  56 +++++----
 .../kafka/pubsub/PublishKafkaRecord_1_0.java    |   7 +-
 .../hortonworks/HortonworksSchemaRegistry.java  | 125 +++++++++++++------
 5 files changed, 160 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c138987b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index eed797e..4d9a5b6 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
 
                     try {
                         reader = readerFactory.createRecordReader(attributes, 
in, logger);
+                    } catch (final IOException e) {
+                        yield();
+                        rollback(topicPartition);
+                        handleParseFailure(consumerRecord, session, e, "Failed 
to parse message from Kafka due to comms failure. Will roll back session and 
try again momentarily.");
+                        closeWriter(writer);
+                        return;
                     } catch (final Exception e) {
                         handleParseFailure(consumerRecord, session, e);
                         continue;
@@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
                             } catch (final Exception e) {
                                 logger.error("Failed to obtain Schema for 
FlowFile. Will roll back the Kafka message offsets.", e);
 
-                                try {
-                                    rollback(topicPartition);
-                                } catch (final Exception rollbackException) {
-                                    logger.warn("Attempted to rollback Kafka 
message offset but was unable to do so", rollbackException);
-                                }
-
+                                rollback(topicPartition);
                                 yield();
+
                                 throw new ProcessException(e);
                             }
 
@@ -572,40 +574,42 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
                         }
 
                         tracker.incrementRecordCount(1L);
-                        session.adjustCounter("Records Received", 
records.size(), false);
+                        session.adjustCounter("Records Received", 1L, false);
                     }
                 }
             }
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will 
roll back session and any un-committed offsets from Kafka.", e);
 
-            try {
-                if (writer != null) {
-                    writer.close();
-                }
-            } catch (final Exception ioe) {
-                logger.warn("Failed to close Record Writer", ioe);
-            }
-
-            try {
-                rollback(topicPartition);
-            } catch (final Exception rollbackException) {
-                logger.warn("Attempted to rollback Kafka message offset but 
was unable to do so", rollbackException);
-            }
+            closeWriter(writer);
+            rollback(topicPartition);
 
             throw new ProcessException(e);
         }
     }
 
+    private void closeWriter(final RecordSetWriter writer) {
+        try {
+            if (writer != null) {
+                writer.close();
+            }
+        } catch (final Exception ioe) {
+            logger.warn("Failed to close Record Writer", ioe);
+        }
+    }
 
     private void rollback(final TopicPartition topicPartition) {
-        OffsetAndMetadata offsetAndMetadata = 
uncommittedOffsetsMap.get(topicPartition);
-        if (offsetAndMetadata == null) {
-            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
-        }
+        try {
+            OffsetAndMetadata offsetAndMetadata = 
uncommittedOffsetsMap.get(topicPartition);
+            if (offsetAndMetadata == null) {
+                offsetAndMetadata = kafkaConsumer.committed(topicPartition);
+            }
 
-        final long offset = offsetAndMetadata.offset();
-        kafkaConsumer.seek(topicPartition, offset);
+            final long offset = offsetAndMetadata == null ? 0L : 
offsetAndMetadata.offset();
+            kafkaConsumer.seek(topicPartition, offset);
+        } catch (final Exception rollbackException) {
+            logger.warn("Attempted to rollback Kafka message offset but was 
unable to do so", rollbackException);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c138987b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
index 093375b..d42df15 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -378,7 +379,10 @@ public class PublishKafkaRecord_0_11 extends 
AbstractProcessor {
             }
 
             // Send each FlowFile to Kafka asynchronously.
-            for (final FlowFile flowFile : flowFiles) {
+            final Iterator<FlowFile> itr = flowFiles.iterator();
+            while (itr.hasNext()) {
+                final FlowFile flowFile = itr.next();
+
                 if (!isScheduled()) {
                     // If stopped, re-queue FlowFile instead of sending it
                     if (useTransactions) {
@@ -388,6 +392,7 @@ public class PublishKafkaRecord_0_11 extends 
AbstractProcessor {
                     }
 
                     session.transfer(flowFile);
+                    itr.remove();
                     continue;
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c138987b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index a2a449c..2e7e2d4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -519,6 +519,12 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
 
                     try {
                         reader = readerFactory.createRecordReader(attributes, 
in, logger);
+                    } catch (final IOException e) {
+                        yield();
+                        rollback(topicPartition);
+                        handleParseFailure(consumerRecord, session, e, "Failed 
to parse message from Kafka due to comms failure. Will roll back session and 
try again momentarily.");
+                        closeWriter(writer);
+                        return;
                     } catch (final Exception e) {
                         handleParseFailure(consumerRecord, session, e);
                         continue;
@@ -543,13 +549,9 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
                             } catch (final Exception e) {
                                 logger.error("Failed to obtain Schema for 
FlowFile. Will roll back the Kafka message offsets.", e);
 
-                                try {
-                                    rollback(topicPartition);
-                                } catch (final Exception rollbackException) {
-                                    logger.warn("Attempted to rollback Kafka 
message offset but was unable to do so", rollbackException);
-                                }
-
+                                rollback(topicPartition);
                                 yield();
+
                                 throw new ProcessException(e);
                             }
 
@@ -572,40 +574,42 @@ public abstract class ConsumerLease implements Closeable, 
ConsumerRebalanceListe
                         }
 
                         tracker.incrementRecordCount(1L);
-                        session.adjustCounter("Records Received", 
records.size(), false);
+                        session.adjustCounter("Records Received", 1L, false);
                     }
                 }
             }
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will 
roll back session and any un-committed offsets from Kafka.", e);
 
-            try {
-                if (writer != null) {
-                    writer.close();
-                }
-            } catch (final Exception ioe) {
-                logger.warn("Failed to close Record Writer", ioe);
-            }
-
-            try {
-                rollback(topicPartition);
-            } catch (final Exception rollbackException) {
-                logger.warn("Attempted to rollback Kafka message offset but 
was unable to do so", rollbackException);
-            }
+            closeWriter(writer);
+            rollback(topicPartition);
 
             throw new ProcessException(e);
         }
     }
 
+    private void closeWriter(final RecordSetWriter writer) {
+        try {
+            if (writer != null) {
+                writer.close();
+            }
+        } catch (final Exception ioe) {
+            logger.warn("Failed to close Record Writer", ioe);
+        }
+    }
 
     private void rollback(final TopicPartition topicPartition) {
-        OffsetAndMetadata offsetAndMetadata = 
uncommittedOffsetsMap.get(topicPartition);
-        if (offsetAndMetadata == null) {
-            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
-        }
+        try {
+            OffsetAndMetadata offsetAndMetadata = 
uncommittedOffsetsMap.get(topicPartition);
+            if (offsetAndMetadata == null) {
+                offsetAndMetadata = kafkaConsumer.committed(topicPartition);
+            }
 
-        final long offset = offsetAndMetadata.offset();
-        kafkaConsumer.seek(topicPartition, offset);
+            final long offset = offsetAndMetadata == null ? 0L : 
offsetAndMetadata.offset();
+            kafkaConsumer.seek(topicPartition, offset);
+        } catch (final Exception rollbackException) {
+            logger.warn("Attempted to rollback Kafka message offset but was 
unable to do so", rollbackException);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c138987b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index c125d62..517cb0c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -378,7 +379,10 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
             }
 
             // Send each FlowFile to Kafka asynchronously.
-            for (final FlowFile flowFile : flowFiles) {
+            final Iterator<FlowFile> itr = flowFiles.iterator();
+            while (itr.hasNext()) {
+                final FlowFile flowFile = itr.next();
+
                 if (!isScheduled()) {
                     // If stopped, re-queue FlowFile instead of sending it
                     if (useTransactions) {
@@ -388,6 +392,7 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
                     }
 
                     session.transfer(flowFile);
+                    itr.remove();
                     continue;
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c138987b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index ccb54b0..f37c927 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.schemaregistry.hortonworks;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -224,22 +225,32 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
 
 
     @Override
-    public RecordSchema retrieveSchema(final String schemaName) throws 
org.apache.nifi.schema.access.SchemaNotFoundException {
+    public RecordSchema retrieveSchema(final String schemaName) throws 
org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
         final SchemaRegistryClient client = getClient();
-        final SchemaMetadataInfo metadataInfo = 
client.getSchemaMetadataInfo(schemaName);
-        if (metadataInfo == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
-        }
 
-        final Long schemaId = metadataInfo.getId();
-        if (schemaId == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
-        }
+        final SchemaVersionInfo versionInfo;
+        final Long schemaId;
+        final Integer version;
+
+        try {
+            final SchemaMetadataInfo metadataInfo = 
client.getSchemaMetadataInfo(schemaName);
+            if (metadataInfo == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
+            }
+
+            schemaId = metadataInfo.getId();
+            if (schemaId == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
+            }
 
-        final SchemaVersionInfo versionInfo = 
getLatestSchemaVersionInfo(client, schemaName);
-        final Integer version = versionInfo.getVersion();
-        if (version == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
+            versionInfo = getLatestSchemaVersionInfo(client, schemaName);
+            version = versionInfo.getVersion();
+            if (version == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
+            }
+        } catch (final Exception e) {
+            handleException("Failed to retrieve schema with name '" + 
schemaName + "'", e);
+            return null;
         }
 
         final String schemaText = versionInfo.getSchemaText();
@@ -254,40 +265,54 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
 
 
     @Override
-    public String retrieveSchemaText(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException {
+    public String retrieveSchemaText(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
         final SchemaRegistryClient client = getClient();
-        final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
-        if (info == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
-        }
 
-        final SchemaMetadata metadata = info.getSchemaMetadata();
-        final String schemaName = metadata.getName();
+        try {
+            final SchemaMetadataInfo info = 
client.getSchemaMetadataInfo(schemaId);
+            if (info == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
+            }
 
-        final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
-        final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, 
schemaVersionKey);
-        if (versionInfo == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
-        }
+            final SchemaMetadata metadata = info.getSchemaMetadata();
+            final String schemaName = metadata.getName();
+
+            final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
+            final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, 
schemaVersionKey);
+            if (versionInfo == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
+            }
 
-        return versionInfo.getSchemaText();
+            return versionInfo.getSchemaText();
+        } catch (final Exception e) {
+            handleException("Failed to retrieve schema with ID '" + schemaId + 
"' and version '" + version + "'", e);
+            return null;
+        }
     }
 
     @Override
-    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException {
+    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
         final SchemaRegistryClient client = getClient();
-        final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId);
-        if (info == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
-        }
 
-        final SchemaMetadata metadata = info.getSchemaMetadata();
-        final String schemaName = metadata.getName();
+        final String schemaName;
+        final SchemaVersionInfo versionInfo;
+        try {
+            final SchemaMetadataInfo info = 
client.getSchemaMetadataInfo(schemaId);
+            if (info == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
+            }
+
+            final SchemaMetadata metadata = info.getSchemaMetadata();
+            schemaName = metadata.getName();
 
-        final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
-        final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, 
schemaVersionKey);
-        if (versionInfo == null) {
-            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
+            final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
+            versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
+            if (versionInfo == null) {
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
+            }
+        } catch (final Exception e) {
+            handleException("Failed to retrieve schema with ID '" + schemaId + 
"' and version '" + version + "'", e);
+            return null;
         }
 
         final String schemaText = versionInfo.getSchemaText();
@@ -300,6 +325,32 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
         });
     }
 
+    // The schema registry client wraps all IOExceptions in RuntimeException. 
So if an IOException occurs, we don't know
+    // that it was an IO problem. So we will look through the Exception's 
cause chain to see if there is an IOException present.
+    private void handleException(final String message, final Exception e) 
throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {
+        if (containsIOException(e)) {
+            throw new IOException(message, e);
+        }
+
+        throw new 
org.apache.nifi.schema.access.SchemaNotFoundException(message, e);
+    }
+
+    private boolean containsIOException(final Throwable t) {
+        if (t == null) {
+            return false;
+        }
+
+        if (t instanceof IOException) {
+            return true;
+        }
+
+        final Throwable cause = t.getCause();
+        if (cause == null) {
+            return false;
+        }
+
+        return containsIOException(cause);
+    }
 
     @Override
     public Set<SchemaField> getSuppliedSchemaFields() {

Reply via email to