Hi Koji, I will print the sql before actually executing it, but I checked the error log line you mentioned in your reply, this error was thrown by NiFi from within another processor called WaitBatch. I didn't find similar errors as the one from the ExecuteSqlCommand processor, I think it's because only the ExecuteSqlCommand is used to create temp database tables. You could check my ExecuteSqlCommand code via the link: https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrhjSk_5P
If the error is really caused by FlowFile repository checkpoint failure and the flowfile was executed twice, I may have to create the temp table only if doesn't exist, I didn't fix this bug in this way right away is because I was afraid this fix could cover some other problems. Thanks. Regards, Ben 2017-12-27 11:38 GMT+08:00 Koji Kawamura <[email protected]>: > Hi Ben, > > The following two log messages are very close in terms of written > timestamp, but have different log level. > 2017-12-26 07:00:01,312 INFO > 2017-12-26 07:00:01,315 ERROR > > I guess those are logged within a single onTrigger of your > ExecuteSqlCommand custom processor, one is before executing, the other > is when it caught an exception. Just guessing as I don't have access > to the code. > > Does the same issue happen with other processors bundled with Apache > NiFi without your custom processor running? > > If NiFi fails to update/checkpoint FlowFile repository, then the same > FlowFile can be processed again after restarting NiFi. > > Thanks, > Koji > > > > On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <[email protected]> wrote: > > Thanks Koji, I will look into this article about the record model. > > > > By the way, that error I previously mentioned to you occurred again, I > > could see the sql query was executed twice in the log, this time I had > > turned on the verbose NiFi logging, the sql query is as below: > > > > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > > c.z.nifi.processors.ExecuteSqlCommand > > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: > SELECT > > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > > dbo.ods_extractDataDebug; > > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > _id; > > > > and it was executed again later: > > > > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > > c.z.nifi.processors.ExecuteSqlCommand > > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] > 执行sql语句失败:SELECT > > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > > dbo.ods_extractDataDebug; > > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > > at > > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError( > SQLServerException.java:217) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult( > SQLServerStatement.java:1655) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement( > SQLServerStatement.java:885) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute( > SQLServerStatement.java:778) > > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505) > > at > > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand( > SQLServerConnection.java:2445) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand( > SQLServerStatement.java:191) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement( > SQLServerStatement.java:166) > > at > > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( > SQLServerStatement.java:751) > > at > > org.apache.commons.dbcp.DelegatingStatement.execute( > DelegatingStatement.java:264) > > at > > org.apache.commons.dbcp.DelegatingStatement.execute( > DelegatingStatement.java:264) > > at > > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > executeSql(ExecuteSqlCommand.java:194) > > at > > com.zjrealtech.nifi.processors.ExecuteSqlCommand. > onTrigger(ExecuteSqlCommand.java:164) > > at > > org.apache.nifi.processor.AbstractProcessor.onTrigger( > AbstractProcessor.java:27) > > at > > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > StandardProcessorNode.java:1119) > > at > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:147) > > at > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > ContinuallyRunProcessorTask.java:47) > > at > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run( > TimerDrivenSchedulingAgent.java:128) > > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > > java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > I also saw a lot of NiFi's exception like "ProcessException: FlowFile > > Repository failed to update", not sure if this is the reason the FlowFile > > got processed twice. Could you help to take a look at my log file? > Thanks. > > You could get the log file via the link: > > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view > > > > Best Regards, > > Ben > > > > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <[email protected]>: > > > >> Hi Ben, > >> > >> This blog post written by Mark, would be a good starting point to get > >> familiar with NiFi Record model. > >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi > >> > >> HA for DistributedMapCacheClientService and DistributedMapCacheServer > >> pair is not supported at the moment. If you need HighAvailability, > >> RedisDistributedMapCacheClientService with Redis replication will > >> provide that, I haven't tried that myself though. > >> https://redis.io/topics/replication > >> > >> Thanks, > >> Koji > >> > >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <[email protected]> wrote: > >> > Thanks for your quick response, Koji, I haven't heard and seen > anything > >> > about the NiFi record data model when I was reading the NiFi > >> > documentations,could you tell me where this model is documented? > Thanks. > >> > > >> > By the way, to my knowledge, when you need to use the > >> DistributedMapCacheServer > >> > from DistributedMapCacheClientService, you need to specify the host > url > >> for > >> > the server, this means inside a NiFi cluster > >> > when I specify the cache server and the node suddenly went down, I > >> couldn't > >> > possibly use it until the node goes up again right? Is there currently > >> such > >> > a cache server in NiFi that could support HA? Thanks. > >> > > >> > Regards, > >> > Ben > >> > > >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <[email protected]>: > >> > > >> >> Hi Ben, > >> >> > >> >> As you found from existing code, DistributedMapCache is used to share > >> >> state among different processors, and it can be used by your custom > >> >> processors, too. > >> >> However, I'd recommend to avoid such tight dependencies between > >> >> FlowFiles if possible, or minimize the part in flow that requires > that > >> >> constraint at least for better performance and simplicity. > >> >> For example, since a FlowFile can hold fairly large amount of data, > >> >> you could merge all FlowFiles in a single FlowFile, instead of > batches > >> >> of FlowFiles. If you need logical boundaries, you can use NiFi Record > >> >> data model to embed multiple records within a FlowFile, Record should > >> >> perform better. > >> >> > >> >> Hope this helps. > >> >> > >> >> Thanks, > >> >> Koji > >> >> > >> >> > >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <[email protected]> wrote: > >> >> > Hi guys, I'm currently trying to find a proper way in nifi which > could > >> >> sync > >> >> > status between my custom processors. > >> >> > our requirement is like this, we're doing some ETL work using nifi > and > >> >> I'm > >> >> > extracting the data from DB into batches of FlowFiles(each batch of > >> >> > FlowFile has a flag FlowFile indicating the end of the batch). > >> >> > There're some groups of custom processors downstream that need to > >> process > >> >> > these FlowFiles to do some business logic work. And we expect these > >> >> > processors to process one batch of FlowFiles at a time. > >> >> > Therefore we need to implement a custom Wait processor(let's just > >> call it > >> >> > WaitBatch here) to hold all the other batches of FlowFiles while > the > >> >> > business processors were handling the batch of FlowFiles whose > >> creation > >> >> > time is earlier. > >> >> > > >> >> > In order to implement this, all the WaitBatch processors placed in > the > >> >> flow > >> >> > need to read/update records in a shared map so that each set of > >> >> > business-logic processors process one batch at a time. > >> >> > The entries are keyed using the batch number of the FlowFiles and > the > >> >> value > >> >> > of each entry is a batch release counter number which counts the > >> number > >> >> of > >> >> > times the batch of FlowFiles has passed through > >> >> > a WaitBatch processor. > >> >> > When a batch is released by WaitBatch, it will try to increment the > >> batch > >> >> > number entry's value by 1 and then the released batch number and > >> counter > >> >> > number will also be saved locally at the WaitBatch with > StateManager; > >> >> > when the next batch reaches the WaitBatch, it will check if the > >> counter > >> >> > value of the previous released batch number in the shared map is > >> greater > >> >> > than the one saved locally, if the entry for the batch number > does't > >> >> > exist(already removed) or the value in the shared map is greater, > the > >> >> next > >> >> > batch will be released and the local state and the entry on the > shared > >> >> map > >> >> > will be updated similarly. > >> >> > In the end of the flow, a custom processor will get the batch > number > >> from > >> >> > each batch and remove the entry from the shared map . > >> >> > > >> >> > So this implementation requires a shared map that could read/update > >> >> > frequently and atomically. I checked the Wait/Notify processors in > >> NIFI > >> >> and > >> >> > saw it is using the DistributedMapCacheClientService and > >> >> > DistributedMapCacheServer to sync status, so I'm wondering if I > could > >> use > >> >> > the DistributedMapCacheClientService to implement my logic. I also > >> saw > >> >> > another implementation called RedisDistributedMapCacheClient > Service > >> >> > which seems to require Redis(I haven't used Redis). Thanks in > advance > >> >> for > >> >> > any suggestions. > >> >> > > >> >> > Regards, > >> >> > Ben > >> >> > >> >
