I see, thanks. The easiest way to look at provenance events would be by right clicking a processor instance you are interested in, then select 'View data provenance' context menu. This way, NiFi displays provenance events for the selected processor.
Koji On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <[email protected]> wrote: > Hi Koji, sorry about the provenance exception, it was because there's no > space left on the machine(filled up with logs) > > Regards, > Ben > > 2017-12-27 17:11 GMT+08:00 尹文才 <[email protected]>: > >> Hi Koji, thanks, the names of the temp tables are created with format >> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second >> part is a random number with length of 4. >> So I think it's not possible to have 2 duplicate table names, the only >> possibly I could think is the flowfile is passed into the processor twice. >> >> About the provenance, I had updated to use the >> WriteAheadProvenanceRepository implementation, but when I tried to check >> the data provenance, it showed me the following exception message: >> HTTP ERROR 500 >> >> Problem accessing /nifi/provenance. Reason: >> >> Server Error >> >> Caused by: >> >> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: >> java.lang.NullPointerException >> at >> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138) >> at >> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) >> at >> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) >> at >> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at >> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) >> at >> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: >> java.lang.NullPointerException >> at >> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) >> at >> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) >> at >> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) >> at >> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) >> at >> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) >> at >> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) >> at >> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) >> at >> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) >> at >> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) >> at >> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) >> at >> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) >> at >> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) >> at >> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) >> at >> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at >> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) >> at >> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) >> at >> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) >> at >> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> ... 3 more >> Caused by: java.lang.NullPointerException >> at >> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) >> at >> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> at >> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) >> ... 36 more >> >> Caused by: >> >> org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException >> at >> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596) >> at >> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655) >> at >> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) >> at >> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) >> at >> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) >> at >> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) >> at >> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) >> at >> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) >> at >> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) >> at >> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) >> at >> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) >> at >> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) >> at >> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) >> at >> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at >> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) >> at >> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) >> at >> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) >> at >> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.NullPointerException >> at >> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) >> at >> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> at >> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) >> ... 36 more >> >> Caused by: >> >> java.lang.NullPointerException >> at >> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397) >> at >> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387) >> at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138) >> at >> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637) >> at >> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498) >> at >> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785) >> at >> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770) >> at >> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) >> at >> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190) >> at >> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168) >> at >> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481) >> at >> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166) >> at >> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141) >> at >> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) >> at >> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118) >> at >> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561) >> at >> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) >> at org.eclipse.jetty.server.Server.handle(Server.java:564) >> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320) >> at >> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) >> at >> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at >> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258) >> at >> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147) >> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110) >> at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124) >> at >> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122) >> at >> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201) >> at >> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672) >> at >> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590) >> at java.lang.Thread.run(Thread.java:745) >> >> My configuration inside nifi.properties is as below: >> # Provenance Repository Properties >> nifi.provenance.repository.implementation=org.apache.nifi.provenance. >> WriteAheadProvenanceRepository >> nifi.provenance.repository.debug.frequency=1_000_000 >> nifi.provenance.repository.encryption.key.provider.implementation= >> nifi.provenance.repository.encryption.key.provider.location= >> nifi.provenance.repository.encryption.key.id= >> nifi.provenance.repository.encryption.key= >> >> # Persistent Provenance Repository Properties >> nifi.provenance.repository.directory.default=../provenance_repository >> nifi.provenance.repository.max.storage.time=24 hours >> nifi.provenance.repository.max.storage.size=1 GB >> nifi.provenance.repository.rollover.time=30 secs >> nifi.provenance.repository.rollover.size=100 MB >> nifi.provenance.repository.query.threads=2 >> nifi.provenance.repository.index.threads=1 >> nifi.provenance.repository.compress.on.rollover=true >> nifi.provenance.repository.always.sync=false >> nifi.provenance.repository.index.shard.size=4 GB >> >> >> By the way, does this Data Provenance list all FlowFiles ever created or >> only part of it? Should I try to find the FlowFile with the exception time >> in the log? Thanks. >> >> Regards, >> Ben >> >> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <[email protected]>: >> >>> Hi Ben, >>> >>> The ExecuteSqlCommand retry logic does not execute the same query >>> multiple times if it succeeds. >>> So, there must be input FlowFiles containing the same query had been >>> passed more than once. >>> It could be the same FlowFile, or different FlowFiles generated by the >>> first processor for some reason. >>> To investigate those kind of FlowFile level information, NiFi >>> provenance data and FlowFile lineage will be very useful. >>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html# >>> viewing-flowfile-lineage >>> >>> I didn't mention about it earlier because you were having Provenance >>> repository performance issue, but I hope you can use it now with the >>> WriteAheadProvenanceRepository. >>> >>> Thanks, >>> Koji >>> >>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <[email protected]> wrote: >>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute >>> > the sql query if the connection is lost(connection could be unstable), >>> my >>> > idea is to only transfer the FlowFile to the success relationship >>> > after successfully executing the sql query. You could see the do while >>> loop >>> > in the code, the transaction will be rollbacked if the execution >>> failed; if >>> > the connection is lost, it will retry to execute the sql. >>> > Will this logic cause my sql to be executed twice? >>> > >>> > For the WaitBatch processor, I will take your approach to test >>> individually >>> > to see if the WaitBatch processor could cause the FlowFile repository >>> > checkpointing failure. >>> > >>> > Regards, >>> > Ben >>> > >>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <[email protected]>: >>> > >>> >> Hi Ben, >>> >> >>> >> Excuse me, I'm trying, but probably I don't fully understand what you >>> >> want to achieve with the flow. >>> >> >>> >> It looks weird that WaitBatch is failing with such FlowFile repository >>> >> error, while other processor such as ReplaceText succeeds. >>> >> I recommend to test WaitBatch alone first without combining the >>> >> database related processors, by feeding a test FlowFile having >>> >> expected FlowFile attributes. >>> >> Such input FlowFiles can be created by GenerateFlowFile processor. >>> >> If the same error happens with only WaitBatch processor, then it >>> >> should be easier to debug. >>> >> >>> >> Thanks, >>> >> Koji >>> >> >>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <[email protected] >>> > >>> >> wrote: >>> >> > Hi Ben, >>> >> > >>> >> > The one thing that looks strange in the screenshot is the >>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection. >>> >> > Those should be transferred to 'failure' relationship. >>> >> > >>> >> > Following executeSql() method, shouldn't it re-throw the caught >>> >> exception? >>> >> > >>> >> > >>> >> > try (Connection con = dbcpService.getConnection()) { >>> >> > logger.debug("设置autoCommit为false"); >>> >> > con.setAutoCommit(false); >>> >> > >>> >> > try (Statement stmt = con.createStatement()) { >>> >> > logger.info("执行sql语句: {}", new Object[]{sql}); >>> >> > stmt.execute(sql); >>> >> > >>> >> > // 所有sql语句执行在一个transaction内 >>> >> > logger.debug("提交transaction"); >>> >> > con.commit(); >>> >> > } catch (Exception ex) { >>> >> > logger.error("执行sql语句失败:{}", new Object[]{sql, >>> ex}); >>> >> > con.rollback(); >>> >> > //将exception抛到外层处理 >>> >> > throw ex; >>> >> > } finally { >>> >> > logger.debug("重新设置autoCommit为true"); >>> >> > con.setAutoCommit(true); >>> >> > } >>> >> > } catch (Exception ex) { >>> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in >>> >> > the incoming connection. >>> >> > logger.error("重试执行sql语句:{}", new Object[]{sql, ex}); >>> >> > retryOnFail = true; >>> >> > } >>> >> > >>> >> > Thanks, >>> >> > Koji >>> >> > >>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <[email protected]> wrote: >>> >> >> Hi Koji, no problem. You could check the code of processor >>> WaitBatch at >>> >> the >>> >> >> link: >>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ >>> >> >> >>> >> >> I also uploaded a snapshot of part of NiFi flow which includes the >>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the >>> >> link: >>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3 >>> IVi2h/view >>> >> >> >>> >> >> You mentioned above that FlowFile repository fails checkpointing >>> will >>> >> >> affect other processors to process same FlowFile again, but as you >>> could >>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second >>> >> processor >>> >> >> and before the WaitBatch processor, even if the FlowFile repository >>> >> >> checkpointing failure is caused by WaitBatch, could it lead to the >>> >> >> processors before it to process a FlowFile multiple times? Thanks. >>> >> >> >>> >> >> Regards, >>> >> >> Ben >>> >> >> >>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <[email protected]>: >>> >> >> >>> >> >>> Hi Ben, >>> >> >>> >>> >> >>> I was referring these two log messages in your previous email. >>> >> >>> These two messages are both written by ExecuteSqlCommand, it does >>> not >>> >> >>> mean 'it was executed again'. >>> >> >>> >>> >> >>> ``` >>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> 执行sql语句: >>> >> SELECT >>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM >>> >> >>> dbo.ods_extractDataDebug; >>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop >>> >> column >>> >> >>> _id; >>> >> >>> >>> >> >>> and it was executed again later: >>> >> >>> >>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >>> >> >>> c.z.nifi.processors.ExecuteSqlCommand >>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> >> >>> 执行sql语句失败:SELECT >>> >> >>> ``` >>> >> >>> >>> >> >>> As you written, the case where FlowFile repository fails >>> checkpointing >>> >> >>> will affect other processors to process same FlowFiles again. >>> However >>> >> >>> there won't be a simple solution to every processor to rollback its >>> >> >>> job as different processors do different things. Creating a temp >>> table >>> >> >>> if not exist seems right approach to me. >>> >> >>> >>> >> >>> At the same time, the route cause of getting FlowFile repository >>> >> >>> failed should be investigated. Is it possible to share WaitBatch >>> code? >>> >> >>> The reason why ask this is all 'FlowFile Repository failed to >>> update' >>> >> >>> is related to WaitBatch processor in the log that you shared >>> earlier. >>> >> >>> >>> >> >>> Thanks, >>> >> >>> Koji >>> >> >>> >>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <[email protected]> wrote: >>> >> >>> > Hi Koji, I will print the sql before actually executing it, but I >>> >> checked >>> >> >>> > the error log line you mentioned in your reply, this error was >>> >> thrown by >>> >> >>> > NiFi from within another processor called WaitBatch. >>> >> >>> > I didn't find similar errors as the one from the >>> ExecuteSqlCommand >>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is >>> used to >>> >> >>> > create temp database tables. >>> >> >>> > You could check my ExecuteSqlCommand code via the link: >>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh >>> jSk_5P >>> >> >>> > >>> >> >>> > If the error is really caused by FlowFile repository checkpoint >>> >> failure >>> >> >>> and >>> >> >>> > the flowfile was executed twice, I may have to create the temp >>> table >>> >> only >>> >> >>> > if doesn't exist, I didn't fix this bug in this way >>> >> >>> > right away is because I was afraid this fix could cover some >>> other >>> >> >>> problems. >>> >> >>> > >>> >> >>> > Thanks. >>> >> >>> > >>> >> >>> > Regards, >>> >> >>> > Ben >>> >> >>> > >>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <[email protected] >>> >: >>> >> >>> > >>> >> >>> >> Hi Ben, >>> >> >>> >> >>> >> >>> >> The following two log messages are very close in terms of >>> written >>> >> >>> >> timestamp, but have different log level. >>> >> >>> >> 2017-12-26 07:00:01,312 INFO >>> >> >>> >> 2017-12-26 07:00:01,315 ERROR >>> >> >>> >> >>> >> >>> >> I guess those are logged within a single onTrigger of your >>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the >>> >> other >>> >> >>> >> is when it caught an exception. Just guessing as I don't have >>> access >>> >> >>> >> to the code. >>> >> >>> >> >>> >> >>> >> Does the same issue happen with other processors bundled with >>> Apache >>> >> >>> >> NiFi without your custom processor running? >>> >> >>> >> >>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the >>> >> same >>> >> >>> >> FlowFile can be processed again after restarting NiFi. >>> >> >>> >> >>> >> >>> >> Thanks, >>> >> >>> >> Koji >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <[email protected]> >>> wrote: >>> >> >>> >> > Thanks Koji, I will look into this article about the record >>> model. >>> >> >>> >> > >>> >> >>> >> > By the way, that error I previously mentioned to you occurred >>> >> again, I >>> >> >>> >> > could see the sql query was executed twice in the log, this >>> time >>> >> I had >>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as below: >>> >> >>> >> > >>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1] >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> >> 执行sql语句: >>> >> >>> >> SELECT >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 >>> FROM >>> >> >>> >> > dbo.ods_extractDataDebug; >>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 >>> drop >>> >> >>> column >>> >> >>> >> _id; >>> >> >>> >> > >>> >> >>> >> > and it was executed again later: >>> >> >>> >> > >>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1] >>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand >>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14] >>> >> >>> >> 执行sql语句失败:SELECT >>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 >>> FROM >>> >> >>> >> > dbo.ods_extractDataDebug; >>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195 >>> drop >>> >> >>> column >>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException: >>> 数据库中已存在名为 >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为 >>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。 >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException. >>> >> >>> makeFromDatabaseError( >>> >> >>> >> SQLServerException.java:217) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResul >>> t( >>> >> >>> >> SQLServerStatement.java:1655) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement. >>> >> doExecuteStatement( >>> >> >>> >> SQLServerStatement.java:885) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$ >>> >> >>> StmtExecCmd.doExecute( >>> >> >>> >> SQLServerStatement.java:778) >>> >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer. >>> >> >>> java:7505) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeComm >>> and( >>> >> >>> >> SQLServerConnection.java:2445) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeComma >>> nd( >>> >> >>> >> SQLServerStatement.java:191) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeState >>> ment( >>> >> >>> >> SQLServerStatement.java:166) >>> >> >>> >> > at >>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute( >>> >> >>> >> SQLServerStatement.java:751) >>> >> >>> >> > at >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >>> >> >>> >> DelegatingStatement.java:264) >>> >> >>> >> > at >>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute( >>> >> >>> >> DelegatingStatement.java:264) >>> >> >>> >> > at >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >>> >> >>> >> executeSql(ExecuteSqlCommand.java:194) >>> >> >>> >> > at >>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand. >>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164) >>> >> >>> >> > at >>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( >>> >> >>> >> AbstractProcessor.java:27) >>> >> >>> >> > at >>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( >>> >> >>> >> StandardProcessorNode.java:1119) >>> >> >>> >> > at >>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >>> >> call( >>> >> >>> >> ContinuallyRunProcessorTask.java:147) >>> >> >>> >> > at >>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. >>> >> call( >>> >> >>> >> ContinuallyRunProcessorTask.java:47) >>> >> >>> >> > at >>> >> >>> >> > org.apache.nifi.controller.scheduling. >>> >> TimerDrivenSchedulingAgent$1. >>> >> >>> run( >>> >> >>> >> TimerDrivenSchedulingAgent.java:128) >>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter. >>> >> >>> >> call(Executors.java:511) >>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset( >>> >> FutureTask.java:308) >>> >> >>> >> > at >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. >>> >> java:180) >>> >> >>> >> > at >>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$ >>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> >> >>> >> > at >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker( >>> >> >>> >> ThreadPoolExecutor.java:1142) >>> >> >>> >> > at >>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( >>> >> >>> >> ThreadPoolExecutor.java:617) >>> >> >>> >> > at java.lang.Thread.run(Thread.java:745) >>> >> >>> >> > >>> >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException: >>> >> FlowFile >>> >> >>> >> > Repository failed to update", not sure if this is the reason >>> the >>> >> >>> FlowFile >>> >> >>> >> > got processed twice. Could you help to take a look at my log >>> >> file? >>> >> >>> >> Thanks. >>> >> >>> >> > You could get the log file via the link: >>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_ >>> >> >>> N9Xu6zMEi3/view >>> >> >>> >> > >>> >> >>> >> > Best Regards, >>> >> >>> >> > Ben >>> >> >>> >> > >>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura < >>> [email protected] >>> >> >: >>> >> >>> >> > >>> >> >>> >> >> Hi Ben, >>> >> >>> >> >> >>> >> >>> >> >> This blog post written by Mark, would be a good starting >>> point >>> >> to get >>> >> >>> >> >> familiar with NiFi Record model. >>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data- >>> >> with-nifi >>> >> >>> >> >> >>> >> >>> >> >> HA for DistributedMapCacheClientService and >>> >> >>> DistributedMapCacheServer >>> >> >>> >> >> pair is not supported at the moment. If you need >>> >> HighAvailability, >>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication >>> >> will >>> >> >>> >> >> provide that, I haven't tried that myself though. >>> >> >>> >> >> https://redis.io/topics/replication >>> >> >>> >> >> >>> >> >>> >> >> Thanks, >>> >> >>> >> >> Koji >>> >> >>> >> >> >>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <[email protected]> >>> >> wrote: >>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and >>> seen >>> >> >>> >> anything >>> >> >>> >> >> > about the NiFi record data model when I was reading the >>> NiFi >>> >> >>> >> >> > documentations,could you tell me where this model is >>> >> documented? >>> >> >>> >> Thanks. >>> >> >>> >> >> > >>> >> >>> >> >> > By the way, to my knowledge, when you need to use the >>> >> >>> >> >> DistributedMapCacheServer >>> >> >>> >> >> > from DistributedMapCacheClientService, you need to >>> specify the >>> >> >>> host >>> >> >>> >> url >>> >> >>> >> >> for >>> >> >>> >> >> > the server, this means inside a NiFi cluster >>> >> >>> >> >> > when I specify the cache server and the node suddenly went >>> >> down, I >>> >> >>> >> >> couldn't >>> >> >>> >> >> > possibly use it until the node goes up again right? Is >>> there >>> >> >>> currently >>> >> >>> >> >> such >>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks. >>> >> >>> >> >> > >>> >> >>> >> >> > Regards, >>> >> >>> >> >> > Ben >>> >> >>> >> >> > >>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura < >>> >> [email protected]>: >>> >> >>> >> >> > >>> >> >>> >> >> >> Hi Ben, >>> >> >>> >> >> >> >>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is >>> used >>> >> to >>> >> >>> share >>> >> >>> >> >> >> state among different processors, and it can be used by >>> your >>> >> >>> custom >>> >> >>> >> >> >> processors, too. >>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies >>> >> between >>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that >>> >> requires >>> >> >>> >> that >>> >> >>> >> >> >> constraint at least for better performance and simplicity. >>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large >>> amount of >>> >> >>> data, >>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile, >>> instead of >>> >> >>> >> batches >>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use >>> NiFi >>> >> >>> Record >>> >> >>> >> >> >> data model to embed multiple records within a FlowFile, >>> Record >>> >> >>> should >>> >> >>> >> >> >> perform better. >>> >> >>> >> >> >> >>> >> >>> >> >> >> Hope this helps. >>> >> >>> >> >> >> >>> >> >>> >> >> >> Thanks, >>> >> >>> >> >> >> Koji >>> >> >>> >> >> >> >>> >> >>> >> >> >> >>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <[email protected] >>> > >>> >> wrote: >>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in >>> nifi >>> >> which >>> >> >>> >> could >>> >> >>> >> >> >> sync >>> >> >>> >> >> >> > status between my custom processors. >>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL work >>> >> using >>> >> >>> nifi >>> >> >>> >> and >>> >> >>> >> >> >> I'm >>> >> >>> >> >> >> > extracting the data from DB into batches of >>> FlowFiles(each >>> >> >>> batch of >>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the >>> >> batch). >>> >> >>> >> >> >> > There're some groups of custom processors downstream >>> that >>> >> need >>> >> >>> to >>> >> >>> >> >> process >>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And we >>> >> expect >>> >> >>> these >>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a time. >>> >> >>> >> >> >> > Therefore we need to implement a custom Wait >>> processor(let's >>> >> >>> just >>> >> >>> >> >> call it >>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of >>> FlowFiles >>> >> while >>> >> >>> >> the >>> >> >>> >> >> >> > business processors were handling the batch of FlowFiles >>> >> whose >>> >> >>> >> >> creation >>> >> >>> >> >> >> > time is earlier. >>> >> >>> >> >> >> > >>> >> >>> >> >> >> > In order to implement this, all the WaitBatch processors >>> >> placed >>> >> >>> in >>> >> >>> >> the >>> >> >>> >> >> >> flow >>> >> >>> >> >> >> > need to read/update records in a shared map so that each >>> >> set of >>> >> >>> >> >> >> > business-logic processors process one batch at a time. >>> >> >>> >> >> >> > The entries are keyed using the batch number of the >>> >> FlowFiles >>> >> >>> and >>> >> >>> >> the >>> >> >>> >> >> >> value >>> >> >>> >> >> >> > of each entry is a batch release counter number which >>> >> counts the >>> >> >>> >> >> number >>> >> >>> >> >> >> of >>> >> >>> >> >> >> > times the batch of FlowFiles has passed through >>> >> >>> >> >> >> > a WaitBatch processor. >>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to >>> >> increment >>> >> >>> the >>> >> >>> >> >> batch >>> >> >>> >> >> >> > number entry's value by 1 and then the released batch >>> >> number and >>> >> >>> >> >> counter >>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch with >>> >> >>> >> StateManager; >>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will >>> check if >>> >> the >>> >> >>> >> >> counter >>> >> >>> >> >> >> > value of the previous released batch number in the >>> shared >>> >> map is >>> >> >>> >> >> greater >>> >> >>> >> >> >> > than the one saved locally, if the entry for the batch >>> >> number >>> >> >>> >> does't >>> >> >>> >> >> >> > exist(already removed) or the value in the shared map is >>> >> >>> greater, >>> >> >>> >> the >>> >> >>> >> >> >> next >>> >> >>> >> >> >> > batch will be released and the local state and the >>> entry on >>> >> the >>> >> >>> >> shared >>> >> >>> >> >> >> map >>> >> >>> >> >> >> > will be updated similarly. >>> >> >>> >> >> >> > In the end of the flow, a custom processor will get the >>> >> batch >>> >> >>> >> number >>> >> >>> >> >> from >>> >> >>> >> >> >> > each batch and remove the entry from the shared map . >>> >> >>> >> >> >> > >>> >> >>> >> >> >> > So this implementation requires a shared map that could >>> >> >>> read/update >>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify >>> >> processors >>> >> >>> in >>> >> >>> >> >> NIFI >>> >> >>> >> >> >> and >>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService >>> and >>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm >>> wondering >>> >> if I >>> >> >>> >> could >>> >> >>> >> >> use >>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my >>> >> logic. I >>> >> >>> also >>> >> >>> >> >> saw >>> >> >>> >> >> >> > another implementation called >>> RedisDistributedMapCacheClient >>> >> >>> >> Service >>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis). >>> Thanks >>> >> in >>> >> >>> >> advance >>> >> >>> >> >> >> for >>> >> >>> >> >> >> > any suggestions. >>> >> >>> >> >> >> > >>> >> >>> >> >> >> > Regards, >>> >> >>> >> >> >> > Ben >>> >> >>> >> >> >> >>> >> >>> >> >> >>> >> >>> >> >>> >> >>> >>> >> >>> >> >>
