Thanks Koji, I will look into this article about the record model. By the way, that error I previously mentioned to you occurred again, I could see the sql query was executed twice in the log, this time I had turned on the verbose NiFi logging, the sql query is as below:
2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] c.z.nifi.processors.ExecuteSqlCommand ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句: SELECT TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM dbo.ods_extractDataDebug; alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id; and it was executed again later: 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] c.z.nifi.processors.ExecuteSqlCommand ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] 执行sql语句失败:SELECT TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM dbo.ods_extractDataDebug; alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop column _id;: com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 'ods_extractDataDebug_20171226031801926_9195' 的对象。 com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 'ods_extractDataDebug_20171226031801926_9195' 的对象。 at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:217) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1655) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:885) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:778) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7505) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2445) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:191) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:166) at com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(SQLServerStatement.java:751) at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) at com.zjrealtech.nifi.processors.ExecuteSqlCommand.executeSql(ExecuteSqlCommand.java:194) at com.zjrealtech.nifi.processors.ExecuteSqlCommand.onTrigger(ExecuteSqlCommand.java:164) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I also saw a lot of NiFi's exception like "ProcessException: FlowFile Repository failed to update", not sure if this is the reason the FlowFile got processed twice. Could you help to take a look at my log file? Thanks. You could get the log file via the link: https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_N9Xu6zMEi3/view Best Regards, Ben 2017-12-27 10:00 GMT+08:00 Koji Kawamura <[email protected]>: > Hi Ben, > > This blog post written by Mark, would be a good starting point to get > familiar with NiFi Record model. > https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi > > HA for DistributedMapCacheClientService and DistributedMapCacheServer > pair is not supported at the moment. If you need HighAvailability, > RedisDistributedMapCacheClientService with Redis replication will > provide that, I haven't tried that myself though. > https://redis.io/topics/replication > > Thanks, > Koji > > On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <[email protected]> wrote: > > Thanks for your quick response, Koji, I haven't heard and seen anything > > about the NiFi record data model when I was reading the NiFi > > documentations,could you tell me where this model is documented? Thanks. > > > > By the way, to my knowledge, when you need to use the > DistributedMapCacheServer > > from DistributedMapCacheClientService, you need to specify the host url > for > > the server, this means inside a NiFi cluster > > when I specify the cache server and the node suddenly went down, I > couldn't > > possibly use it until the node goes up again right? Is there currently > such > > a cache server in NiFi that could support HA? Thanks. > > > > Regards, > > Ben > > > > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <[email protected]>: > > > >> Hi Ben, > >> > >> As you found from existing code, DistributedMapCache is used to share > >> state among different processors, and it can be used by your custom > >> processors, too. > >> However, I'd recommend to avoid such tight dependencies between > >> FlowFiles if possible, or minimize the part in flow that requires that > >> constraint at least for better performance and simplicity. > >> For example, since a FlowFile can hold fairly large amount of data, > >> you could merge all FlowFiles in a single FlowFile, instead of batches > >> of FlowFiles. If you need logical boundaries, you can use NiFi Record > >> data model to embed multiple records within a FlowFile, Record should > >> perform better. > >> > >> Hope this helps. > >> > >> Thanks, > >> Koji > >> > >> > >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <[email protected]> wrote: > >> > Hi guys, I'm currently trying to find a proper way in nifi which could > >> sync > >> > status between my custom processors. > >> > our requirement is like this, we're doing some ETL work using nifi and > >> I'm > >> > extracting the data from DB into batches of FlowFiles(each batch of > >> > FlowFile has a flag FlowFile indicating the end of the batch). > >> > There're some groups of custom processors downstream that need to > process > >> > these FlowFiles to do some business logic work. And we expect these > >> > processors to process one batch of FlowFiles at a time. > >> > Therefore we need to implement a custom Wait processor(let's just > call it > >> > WaitBatch here) to hold all the other batches of FlowFiles while the > >> > business processors were handling the batch of FlowFiles whose > creation > >> > time is earlier. > >> > > >> > In order to implement this, all the WaitBatch processors placed in the > >> flow > >> > need to read/update records in a shared map so that each set of > >> > business-logic processors process one batch at a time. > >> > The entries are keyed using the batch number of the FlowFiles and the > >> value > >> > of each entry is a batch release counter number which counts the > number > >> of > >> > times the batch of FlowFiles has passed through > >> > a WaitBatch processor. > >> > When a batch is released by WaitBatch, it will try to increment the > batch > >> > number entry's value by 1 and then the released batch number and > counter > >> > number will also be saved locally at the WaitBatch with StateManager; > >> > when the next batch reaches the WaitBatch, it will check if the > counter > >> > value of the previous released batch number in the shared map is > greater > >> > than the one saved locally, if the entry for the batch number does't > >> > exist(already removed) or the value in the shared map is greater, the > >> next > >> > batch will be released and the local state and the entry on the shared > >> map > >> > will be updated similarly. > >> > In the end of the flow, a custom processor will get the batch number > from > >> > each batch and remove the entry from the shared map . > >> > > >> > So this implementation requires a shared map that could read/update > >> > frequently and atomically. I checked the Wait/Notify processors in > NIFI > >> and > >> > saw it is using the DistributedMapCacheClientService and > >> > DistributedMapCacheServer to sync status, so I'm wondering if I could > use > >> > the DistributedMapCacheClientService to implement my logic. I also > saw > >> > another implementation called RedisDistributedMapCacheClientService > >> > which seems to require Redis(I haven't used Redis). Thanks in advance > >> for > >> > any suggestions. > >> > > >> > Regards, > >> > Ben > >> >
