Github user mans2singh commented on the pull request:
https://github.com/apache/nifi/pull/239#issuecomment-218325113
Hi Nifi Folks, @apiri, @olegz, @joewitt :
Thanks for your review comments. I apologize I had to take care of work
related assignments and could not get back to addressing your review comments,
earlier. However, I would like to discuss this pull request with and get your
advice/recommendations on it.
Here is a summary some of the changes I made for integrating the Kinesis
Consumer/Producer into the aws nifi processors. You probably are already
familiar with these but just so that we are on the same page and you can
correct/advice me. Currently aws nifi processors extend the class:
`
public abstract class AbstractAWSCredentialsProviderProcessor<ClientType
extends AmazonWebServiceClient> extends AbstractAWSProcessor<ClientType>
`
All the current aws nifi processors use AmazonWebServiceClient to connect
the aws services. There is a similar client for Kinesis:
`AmazonKinesisClient`. However, AWS also a new pair of libraries
`KinesisProducer` from the Kinesis Producer Library (KPL) and
`KinesisRecordProcessor` based `Worker` from Kinesis Consumer Library (KCL)
classes. These new libraries handle batching, packing records, parallelism,
resharding, workload balancing, monitoring, asynchronous processing, retries,
etc. For using KCL/KPL we do not need `AmazonWebServiceClient` based client
classes. I chose to implement the Nifi AWS Kinesis Get and Put processors
using KCL/KPL libraries rather than the older AmazonKinesisClient.
I could have implemented Nifi Kinesis processors separately, but I thought
it would be better if we have a common base class hierarchy for all Nifi AWS
processors. In order to make things consist for all the Nifi AWS processors
(those that use `AmazonWebServiceClient` and those that don't eg:
`KinesisProducer`), I created a base class 'AbstractBaseAWSProcessor' which is
a non-generic, simple class. This class has two subclasses:
- `AbstractAWSProcessor` which is the superclass of all Nifi AWS Processors
using generic `AmazonWebServiceClient` (eg: S3, etc)
- `AbstractKinesisProcessor` which does not use `AmazonWebServiceClient`
and instead use the KPL/KCL classes.
I used `AbstractSessionFactoryProcessor` as the base class for
`AbstractBaseAWSProcessor` rather than `AbstractProcessor`. The reason for
this was that KCL `Worker` instance invokes `IRecordProcessorFactory`
asynchronously to create a `IRecordProcessor`, which it uses to handles records
as they arrive by invoking the following method:
`void processRecords(ProcessRecordsInput processRecordsInput)`
In order to create the `ProcessSession` on demand (for GetKinesis) I found
we need to have access to the `SessionFactory`. The `AbstractBaseAWSProcessor`
therefore extends `AbstractSessionFactoryProcessor` and it has two methods to
handle both cases where we need to access session for current Nifi AWS
processors and PutKinesis processor, and GetKinesis which can create the
session on demand on arrival of records. Here is the relevant code:
` public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back
session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;`
Finally, the `Worker` used in the `GetKinesis` is a runnable and I used
ExecutorService to launch and manage it's lifecycle.
I've migrated the base classes of all the current Nifi AWS processors to
work the new class hierarchy.
If you have any thoughts/recommendations/etc on how I can implement the
Kinesis processors using KCL/KPL please let me know.
To address the review comments from @olegz :
1. Regarding the empty onScheduled method and constructor - I can remove
them.
2. Regarding using sub-classing from `AbstractProcessor`, I used the
`AbstractSessionFactoryProcessor` to get access to SessionFactory on demand.
3. Regarding consumerWorker.shutdown() throwing exception - The shutdown
method on the worker only sets a shutdown flag and does not throw any
exception. But if you have any advice, let me know.
Thanks again for your advice/support and I hope to get your feedback
incorporated as soon as I can.
Mans
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---