Hi Ben, The following two log messages are very close in terms of written timestamp, but have different log level. 2017-12-26 07:00:01,312 INFO 2017-12-26 07:00:01,315 ERROR
I guess those are logged within a single onTrigger of your ExecuteSqlCommand custom processor, one is before executing, the other is when it caught an exception. Just guessing as I don't have access to the code. Does the same issue happen with other processors bundled with Apache NiFi without your custom processor running? If NiFi fails to update/checkpoint FlowFile repository, then the same FlowFile can be processed again after restarting NiFi. Thanks, Koji On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <batman...@gmail.com> wrote: > Thanks Koji, I will look into this article about the record model. > > By the way, that error I previously mentioned to you occurred again, I > could see the sql query was executed twice in the log, this time I had > turned on the verbose NiFi logging, the sql query is as below: > > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] > c.z.nifi.processors.ExecuteSqlCommand > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > dbo.ods_extractDataDebug; > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id; > > and it was executed again later: > > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] > c.z.nifi.processors.ExecuteSqlCommand > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM > dbo.ods_extractDataDebug; > alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 > at > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778) > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505) > at > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166) > at > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751) > at > org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) > at > org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) > at > com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194) > at > com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > I also saw a lot of NiFi's exception like "ProcessException: FlowFile > Repository failed to update", not sure if this is the reason the FlowFile > got processed twice. Could you help to take a look at my log file? Thanks. > You could get the log file via the link: > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view > > Best Regards, > Ben > > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: > >> Hi Ben, >> >> This blog post written by Mark, would be a good starting point to get >> familiar with NiFi Record model. >> https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi >> >> HA for DistributedMapCacheClientService and DistributedMapCacheServer >> pair is not supported at the moment. If you need HighAvailability, >> RedisDistributedMapCacheClientService with Redis replication will >> provide that, I haven't tried that myself though. >> https://redis.io/topics/replication >> >> Thanks, >> Koji >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <batman...@gmail.com> wrote: >> > Thanks for your quick response, Koji, I haven't heard and seen anything >> > about the NiFi record data model when I was reading the NiFi >> > documentations,could you tell me where this model is documented? Thanks. >> > >> > By the way, to my knowledge, when you need to use the >> DistributedMapCacheServer >> > from DistributedMapCacheClientService, you need to specify the host url >> for >> > the server, this means inside a NiFi cluster >> > when I specify the cache server and the node suddenly went down, I >> couldn't >> > possibly use it until the node goes up again right? Is there currently >> such >> > a cache server in NiFi that could support HA? Thanks. >> > >> > Regards, >> > Ben >> > >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <ijokaruma...@gmail.com>: >> > >> >> Hi Ben, >> >> >> >> As you found from existing code, DistributedMapCache is used to share >> >> state among different processors, and it can be used by your custom >> >> processors, too. >> >> However, I'd recommend to avoid such tight dependencies between >> >> FlowFiles if possible, or minimize the part in flow that requires that >> >> constraint at least for better performance and simplicity. >> >> For example, since a FlowFile can hold fairly large amount of data, >> >> you could merge all FlowFiles in a single FlowFile, instead of batches >> >> of FlowFiles. If you need logical boundaries, you can use NiFi Record >> >> data model to embed multiple records within a FlowFile, Record should >> >> perform better. >> >> >> >> Hope this helps. >> >> >> >> Thanks, >> >> Koji >> >> >> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <batman...@gmail.com> wrote: >> >> > Hi guys, I'm currently trying to find a proper way in nifi which could >> >> sync >> >> > status between my custom processors. >> >> > our requirement is like this, we're doing some ETL work using nifi and >> >> I'm >> >> > extracting the data from DB into batches of FlowFiles(each batch of >> >> > FlowFile has a flag FlowFile indicating the end of the batch). >> >> > There're some groups of custom processors downstream that need to >> process >> >> > these FlowFiles to do some business logic work. And we expect these >> >> > processors to process one batch of FlowFiles at a time. >> >> > Therefore we need to implement a custom Wait processor(let's just >> call it >> >> > WaitBatch here) to hold all the other batches of FlowFiles while the >> >> > business processors were handling the batch of FlowFiles whose >> creation >> >> > time is earlier. >> >> > >> >> > In order to implement this, all the WaitBatch processors placed in the >> >> flow >> >> > need to read/update records in a shared map so that each set of >> >> > business-logic processors process one batch at a time. >> >> > The entries are keyed using the batch number of the FlowFiles and the >> >> value >> >> > of each entry is a batch release counter number which counts the >> number >> >> of >> >> > times the batch of FlowFiles has passed through >> >> > a WaitBatch processor. >> >> > When a batch is released by WaitBatch, it will try to increment the >> batch >> >> > number entry's value by 1 and then the released batch number and >> counter >> >> > number will also be saved locally at the WaitBatch with StateManager; >> >> > when the next batch reaches the WaitBatch, it will check if the >> counter >> >> > value of the previous released batch number in the shared map is >> greater >> >> > than the one saved locally, if the entry for the batch number does't >> >> > exist(already removed) or the value in the shared map is greater, the >> >> next >> >> > batch will be released and the local state and the entry on the shared >> >> map >> >> > will be updated similarly. >> >> > In the end of the flow, a custom processor will get the batch number >> from >> >> > each batch and remove the entry from the shared map . >> >> > >> >> > So this implementation requires a shared map that could read/update >> >> > frequently and atomically. I checked the Wait/Notify processors in >> NIFI >> >> and >> >> > saw it is using the DistributedMapCacheClientService and >> >> > DistributedMapCacheServer to sync status, so I'm wondering if I could >> use >> >> > the DistributedMapCacheClientService to implement my logic. I also >> saw >> >> > another implementation called RedisDistributedMapCacheClientService >> >> > which seems to require Redis(I haven't used Redis). Thanks in advance >> >> for >> >> > any suggestions. >> >> > >> >> > Regards, >> >> > Ben >> >> >>