Hi Ben,

The following two log messages are very close in terms of written
timestamp, but have different log level.
2017-12-26 07:00:01,312 INFO
2017-12-26 07:00:01,315 ERROR

I guess those are logged within a single onTrigger of your
ExecuteSqlCommand custom processor, one is before executing, the other
is when it caught an exception. Just guessing as I don't have access
to the code.

Does the same issue happen with other processors bundled with Apache
NiFi without your custom processor running?

If NiFi fails to update/checkpoint FlowFile repository, then the same
FlowFile can be processed again after restarting NiFi.

Thanks,
Koji



On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote:
> Thanks Koji, I will look into this article about the record model.
>
> By the way, that error I previously mentioned to you occurred again, I
> could see the sql query was executed twice in the log, this time I had
> turned on the verbose NiFi logging, the sql query is as below:
>
> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;
>
> and it was executed again later:
>
> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
> c.z.nifi.processors.ExecuteSqlCommand
> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT
> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
> dbo.ods_extractDataDebug;
> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column
> _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
> 'ods_extractDataDebug_20171226031801926_9195' 的对象。
> at
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778)
> at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505)
> at
> com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166)
> at
> com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194)
> at
> com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I also saw a lot of NiFi's exception like "ProcessException: FlowFile
> Repository failed to update", not sure if this is the reason the FlowFile
> got processed twice.  Could you help to take a look at my log file? Thanks.
> You could get the log file via the link:
> https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view
>
> Best Regards,
> Ben
>
> 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>
>> Hi Ben,
>>
>> This blog post written by Mark, would be a good starting point to get
>> familiar with NiFi Record model.
>> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
>>
>> HA for DistributedMapCacheClientService and DistributedMapCacheServer
>> pair is not supported at the moment. If you need HighAvailability,
>> RedisDistributedMapCacheClientService with Redis replication will
>> provide that, I haven't tried that myself though.
>> https://redis.io/topics/replication
>>
>> Thanks,
>> Koji
>>
>> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote:
>> > Thanks for your quick response, Koji, I haven't heard and seen anything
>> > about the NiFi record data model when I was reading the NiFi
>> > documentations,could you tell me where this model is documented? Thanks.
>> >
>> > By the way, to my knowledge, when you need to use the
>> DistributedMapCacheServer
>> > from DistributedMapCacheClientService, you need to specify the host url
>> for
>> > the server, this means inside a NiFi cluster
>> > when I specify the cache server and the node suddenly went down, I
>> couldn't
>> > possibly use it until the node goes up again right? Is there currently
>> such
>> > a cache server in NiFi that could support HA? Thanks.
>> >
>> > Regards,
>> > Ben
>> >
>> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>:
>> >
>> >> Hi Ben,
>> >>
>> >> As you found from existing code, DistributedMapCache is used to share
>> >> state among different processors, and it can be used by your custom
>> >> processors, too.
>> >> However, I'd recommend to avoid such tight dependencies between
>> >> FlowFiles if possible, or minimize the part in flow that requires that
>> >> constraint at least for better performance and simplicity.
>> >> For example, since a FlowFile can hold fairly large amount of data,
>> >> you could merge all FlowFiles in a single FlowFile, instead of batches
>> >> of FlowFiles. If you need logical boundaries, you can use NiFi Record
>> >> data model to embed multiple records within a FlowFile, Record should
>> >> perform better.
>> >>
>> >> Hope this helps.
>> >>
>> >> Thanks,
>> >> Koji
>> >>
>> >>
>> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote:
>> >> > Hi guys, I'm currently trying to find a proper way in nifi which could
>> >> sync
>> >> > status between my custom processors.
>> >> > our requirement is like this, we're doing some ETL work using nifi and
>> >> I'm
>> >> > extracting the data from DB into batches of FlowFiles(each batch of
>> >> > FlowFile has a flag FlowFile indicating the end of the batch).
>> >> > There're some groups of custom processors downstream that need to
>> process
>> >> > these FlowFiles to do some business logic work. And we expect these
>> >> > processors to process one batch of FlowFiles at a time.
>> >> > Therefore we need to implement a custom Wait processor(let's just
>> call it
>> >> > WaitBatch here) to hold all the other batches of FlowFiles while the
>> >> > business processors were handling the batch of FlowFiles whose
>> creation
>> >> > time is earlier.
>> >> >
>> >> > In order to implement this, all the WaitBatch processors placed in the
>> >> flow
>> >> > need to read/update records in a shared map so that each set of
>> >> > business-logic processors process one batch at a time.
>> >> > The entries are keyed using the batch number of the FlowFiles and the
>> >> value
>> >> > of each entry is a batch release counter number which counts the
>> number
>> >> of
>> >> > times the batch of FlowFiles has passed through
>> >> > a WaitBatch processor.
>> >> > When a batch is released by WaitBatch, it will try to increment the
>> batch
>> >> > number entry's value by 1 and then the released batch number and
>> counter
>> >> > number will also be saved locally at the WaitBatch with StateManager;
>> >> > when the next batch reaches the WaitBatch, it will check if the
>> counter
>> >> > value of the previous released batch number in the shared map is
>> greater
>> >> > than the one saved locally, if the entry for the batch number does't
>> >> > exist(already removed) or the value in the shared map is greater, the
>> >> next
>> >> > batch will be released and the local state and the entry on the shared
>> >> map
>> >> > will be updated similarly.
>> >> > In the end of the flow, a custom processor will get the batch number
>> from
>> >> > each batch and remove the entry from the shared map .
>> >> >
>> >> > So this implementation requires a shared map that could read/update
>> >> > frequently and atomically. I checked the Wait/Notify processors in
>> NIFI
>> >> and
>> >> > saw it is using the DistributedMapCacheClientService and
>> >> > DistributedMapCacheServer to sync status, so I'm wondering if I could
>> use
>> >> > the DistributedMapCacheClientService to implement my logic. I also
>> saw
>> >> > another implementation called RedisDistributedMapCacheClientService
>> >> > which seems to require Redis(I haven't used Redis).  Thanks in advance
>> >> for
>> >> > any suggestions.
>> >> >
>> >> > Regards,
>> >> > Ben
>> >>
>>

Reply via email to