I see, thanks. The easiest way to look at provenance events would be
by right clicking a processor instance you are interested in, then
select 'View data provenance' context menu. This way, NiFi displays
provenance events for the selected processor.

Koji

On Wed, Dec 27, 2017 at 6:17 PM, 尹文才 <[email protected]> wrote:
> Hi Koji, sorry about the provenance exception, it was because there's no
> space left on the machine(filled up with logs)
>
> Regards,
> Ben
>
> 2017-12-27 17:11 GMT+08:00 尹文才 <[email protected]>:
>
>> Hi Koji, thanks, the names of the temp tables are created with format
>> "yyyyMMddHHmmssSSS-nnnn", the first time indicates the time and the second
>> part is a random number with length of 4.
>> So I think it's not possible to have 2 duplicate table names, the only
>> possibly I could think is the flowfile is passed into the processor twice.
>>
>> About the provenance, I had updated to use the
>> WriteAheadProvenanceRepository implementation, but when I tried to check
>> the data provenance, it showed me the following exception message:
>> HTTP ERROR 500
>>
>> Problem accessing /nifi/provenance. Reason:
>>
>>     Server Error
>>
>> Caused by:
>>
>> javax.servlet.ServletException: org.eclipse.jetty.servlet.ServletHolder$1: 
>> java.lang.NullPointerException
>>       at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:138)
>>       at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>       at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>       at 
>> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>>       at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.eclipse.jetty.servlet.ServletHolder$1: 
>> java.lang.NullPointerException
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>>       at 
>> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>>       at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>       at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>       at 
>> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>>       at 
>> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>>       ... 3 more
>> Caused by: java.lang.NullPointerException
>>       at 
>> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>>       at 
>> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>>       ... 36 more
>>
>> Caused by:
>>
>> org.eclipse.jetty.servlet.ServletHolder$1: java.lang.NullPointerException
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.makeUnavailable(ServletHolder.java:596)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:655)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>>       at 
>> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>>       at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>       at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>       at 
>> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>>       at 
>> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>>       at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>>       at 
>> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>>       at 
>> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>>       ... 36 more
>>
>> Caused by:
>>
>> java.lang.NullPointerException
>>       at 
>> org.apache.jasper.servlet.JspServlet.handleMissingResource(JspServlet.java:397)
>>       at 
>> org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:387)
>>       at org.apache.jasper.servlet.JspServlet.init(JspServlet.java:138)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:637)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.getServlet(ServletHolder.java:498)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.ensureInstance(ServletHolder.java:785)
>>       at 
>> org.eclipse.jetty.servlet.ServletHolder.prepare(ServletHolder.java:770)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:538)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
>>       at 
>> org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:190)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1593)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1239)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
>>       at 
>> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:481)
>>       at 
>> org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1562)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
>>       at 
>> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1141)
>>       at 
>> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:118)
>>       at 
>> org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:561)
>>       at 
>> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>>       at org.eclipse.jetty.server.Server.handle(Server.java:564)
>>       at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
>>       at 
>> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
>>       at 
>> org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:258)
>>       at 
>> org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:147)
>>       at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
>>       at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
>>       at 
>> org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:122)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecutingExecutionStrategy.invoke(ExecutingExecutionStrategy.java:58)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:201)
>>       at 
>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:133)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
>>       at 
>> org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
>>       at java.lang.Thread.run(Thread.java:745)
>>
>> My configuration inside nifi.properties is as below:
>> # Provenance Repository Properties
>> nifi.provenance.repository.implementation=org.apache.nifi.provenance.
>> WriteAheadProvenanceRepository
>> nifi.provenance.repository.debug.frequency=1_000_000
>> nifi.provenance.repository.encryption.key.provider.implementation=
>> nifi.provenance.repository.encryption.key.provider.location=
>> nifi.provenance.repository.encryption.key.id=
>> nifi.provenance.repository.encryption.key=
>>
>> # Persistent Provenance Repository Properties
>> nifi.provenance.repository.directory.default=../provenance_repository
>> nifi.provenance.repository.max.storage.time=24 hours
>> nifi.provenance.repository.max.storage.size=1 GB
>> nifi.provenance.repository.rollover.time=30 secs
>> nifi.provenance.repository.rollover.size=100 MB
>> nifi.provenance.repository.query.threads=2
>> nifi.provenance.repository.index.threads=1
>> nifi.provenance.repository.compress.on.rollover=true
>> nifi.provenance.repository.always.sync=false
>> nifi.provenance.repository.index.shard.size=4 GB
>>
>>
>> By the way, does this Data Provenance list all FlowFiles ever created or
>> only part of it? Should I try to find the FlowFile with the exception time
>> in the log? Thanks.
>>
>> Regards,
>> Ben
>>
>> 2017-12-27 16:57 GMT+08:00 Koji Kawamura <[email protected]>:
>>
>>> Hi Ben,
>>>
>>> The ExecuteSqlCommand retry logic does not execute the same query
>>> multiple times if it succeeds.
>>> So, there must be input FlowFiles containing the same query had been
>>> passed more than once.
>>> It could be the same FlowFile, or different FlowFiles generated by the
>>> first processor for some reason.
>>> To investigate those kind of FlowFile level information, NiFi
>>> provenance data and FlowFile lineage will be very useful.
>>> https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#
>>> viewing-flowfile-lineage
>>>
>>> I didn't mention about it earlier because you were having Provenance
>>> repository performance issue, but I hope you can use it now with the
>>> WriteAheadProvenanceRepository.
>>>
>>> Thanks,
>>> Koji
>>>
>>> On Wed, Dec 27, 2017 at 5:44 PM, 尹文才 <[email protected]> wrote:
>>> > Thanks Koji, for the ExecuteSqlCommand issue, I was trying to re-execute
>>> > the sql query if the connection is lost(connection could be unstable),
>>> my
>>> > idea is to only transfer the FlowFile to the success relationship
>>> > after successfully executing the sql query. You could see the do while
>>> loop
>>> > in the code, the transaction will be rollbacked if the execution
>>> failed; if
>>> > the connection is lost, it will retry to execute the sql.
>>> > Will this logic cause my sql to be executed twice?
>>> >
>>> > For the WaitBatch processor, I will take your approach to test
>>> individually
>>> > to see if the WaitBatch processor could cause the FlowFile repository
>>> > checkpointing failure.
>>> >
>>> > Regards,
>>> > Ben
>>> >
>>> > 2017-12-27 16:10 GMT+08:00 Koji Kawamura <[email protected]>:
>>> >
>>> >> Hi Ben,
>>> >>
>>> >> Excuse me, I'm trying, but probably I don't fully understand what you
>>> >> want to achieve with the flow.
>>> >>
>>> >> It looks weird that WaitBatch is failing with such FlowFile repository
>>> >> error, while other processor such as ReplaceText succeeds.
>>> >> I recommend to test WaitBatch alone first without combining the
>>> >> database related processors, by feeding a test FlowFile having
>>> >> expected FlowFile attributes.
>>> >> Such input FlowFiles can be created by GenerateFlowFile processor.
>>> >> If the same error happens with only WaitBatch processor, then it
>>> >> should be easier to debug.
>>> >>
>>> >> Thanks,
>>> >> Koji
>>> >>
>>> >> On Wed, Dec 27, 2017 at 4:49 PM, Koji Kawamura <[email protected]
>>> >
>>> >> wrote:
>>> >> > Hi Ben,
>>> >> >
>>> >> > The one thing that looks strange in the screenshot is the
>>> >> > ExecuteSqlCommand having FlowFiles queued in its incoming connection.
>>> >> > Those should be transferred to 'failure' relationship.
>>> >> >
>>> >> > Following executeSql() method, shouldn't it re-throw the caught
>>> >> exception?
>>> >> >
>>> >> >
>>> >> >             try (Connection con = dbcpService.getConnection()) {
>>> >> >                 logger.debug("设置autoCommit为false");
>>> >> >                 con.setAutoCommit(false);
>>> >> >
>>> >> >                 try (Statement stmt = con.createStatement()) {
>>> >> >                     logger.info("执行sql语句: {}", new Object[]{sql});
>>> >> >                     stmt.execute(sql);
>>> >> >
>>> >> >                     // 所有sql语句执行在一个transaction内
>>> >> >                     logger.debug("提交transaction");
>>> >> >                     con.commit();
>>> >> >                 } catch (Exception ex) {
>>> >> >                     logger.error("执行sql语句失败:{}", new Object[]{sql,
>>> ex});
>>> >> >                     con.rollback();
>>> >> >                     //将exception抛到外层处理
>>> >> >                     throw ex;
>>> >> >                 } finally {
>>> >> >                     logger.debug("重新设置autoCommit为true");
>>> >> >                     con.setAutoCommit(true);
>>> >> >                 }
>>> >> >             } catch (Exception ex) {
>>> >> > // HERE, the exception is swallowed, that's why the FlowFiles stay in
>>> >> > the incoming connection.
>>> >> >                 logger.error("重试执行sql语句:{}", new Object[]{sql, ex});
>>> >> >                 retryOnFail = true;
>>> >> >             }
>>> >> >
>>> >> > Thanks,
>>> >> > Koji
>>> >> >
>>> >> > On Wed, Dec 27, 2017 at 2:38 PM, 尹文才 <[email protected]> wrote:
>>> >> >> Hi Koji, no problem. You could check the code of processor
>>> WaitBatch at
>>> >> the
>>> >> >> link:
>>> >> >> https://drive.google.com/open?id=1DMpW5GMiXpyZQdui989Rr3D9rlchQfWQ
>>> >> >>
>>> >> >> I also uploaded a snapshot of part of NiFi flow which includes the
>>> >> >> ExecuteSqlCommand and WaitBatch, you could check the picture at the
>>> >> link:
>>> >> >> https://drive.google.com/file/d/1vdxlWj8ANHQH0CMrXnydLni5o-3
>>> IVi2h/view
>>> >> >>
>>> >> >> You mentioned above that FlowFile repository fails checkpointing
>>> will
>>> >> >> affect other processors to process same FlowFile again, but as you
>>> could
>>> >> >> see from my snapshot image, the ExecuteSqlCommand is the second
>>> >> processor
>>> >> >> and before the WaitBatch processor, even if the FlowFile repository
>>> >> >> checkpointing failure is caused by WaitBatch, could it lead to the
>>> >> >> processors before it to process a FlowFile multiple times? Thanks.
>>> >> >>
>>> >> >> Regards,
>>> >> >> Ben
>>> >> >>
>>> >> >> 2017-12-27 12:36 GMT+08:00 Koji Kawamura <[email protected]>:
>>> >> >>
>>> >> >>> Hi Ben,
>>> >> >>>
>>> >> >>> I was referring these two log messages in your previous email.
>>> >> >>> These two messages are both written by ExecuteSqlCommand, it does
>>> not
>>> >> >>> mean 'it was executed again'.
>>> >> >>>
>>> >> >>> ```
>>> >> >>> 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> 执行sql语句:
>>> >> SELECT
>>> >> >>> TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195 FROM
>>> >> >>> dbo.ods_extractDataDebug;
>>> >> >>> alter table tmp.ods_extractDataDebug_20171226031801926_9195 drop
>>> >> column
>>> >> >>> _id;
>>> >> >>>
>>> >> >>> and it was executed again later:
>>> >> >>>
>>> >> >>> 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>>> >> >>> c.z.nifi.processors.ExecuteSqlCommand
>>> >> >>> ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> >> >>> 执行sql语句失败:SELECT
>>> >> >>> ```
>>> >> >>>
>>> >> >>> As you written, the case where FlowFile repository fails
>>> checkpointing
>>> >> >>> will affect other processors to process same FlowFiles again.
>>> However
>>> >> >>> there won't be a simple solution to every processor to rollback its
>>> >> >>> job as different processors do different things. Creating a temp
>>> table
>>> >> >>> if not exist seems right approach to me.
>>> >> >>>
>>> >> >>> At the same time, the route cause of getting FlowFile repository
>>> >> >>> failed should be investigated. Is it possible to share WaitBatch
>>> code?
>>> >> >>> The reason why ask this is all 'FlowFile Repository failed to
>>> update'
>>> >> >>> is related to WaitBatch processor in the log that you shared
>>> earlier.
>>> >> >>>
>>> >> >>> Thanks,
>>> >> >>> Koji
>>> >> >>>
>>> >> >>> On Wed, Dec 27, 2017 at 1:19 PM, 尹文才 <[email protected]> wrote:
>>> >> >>> > Hi Koji, I will print the sql before actually executing it, but I
>>> >> checked
>>> >> >>> > the error log line you mentioned in your reply, this error was
>>> >> thrown by
>>> >> >>> > NiFi from within another processor called WaitBatch.
>>> >> >>> > I didn't find similar errors as the one from the
>>> ExecuteSqlCommand
>>> >> >>> > processor, I think it's because only the ExecuteSqlCommand is
>>> used to
>>> >> >>> > create temp database tables.
>>> >> >>> > You could check my ExecuteSqlCommand code via the link:
>>> >> >>> > https://drive.google.com/open?id=1NnjBihyKpmUPEH7X28Mh2hgOrh
>>> jSk_5P
>>> >> >>> >
>>> >> >>> > If the error is really caused by FlowFile repository checkpoint
>>> >> failure
>>> >> >>> and
>>> >> >>> > the flowfile was executed twice, I may have to create the temp
>>> table
>>> >> only
>>> >> >>> > if doesn't exist, I didn't fix this bug in this way
>>> >> >>> > right away is because I was afraid this fix could cover some
>>> other
>>> >> >>> problems.
>>> >> >>> >
>>> >> >>> > Thanks.
>>> >> >>> >
>>> >> >>> > Regards,
>>> >> >>> > Ben
>>> >> >>> >
>>> >> >>> > 2017-12-27 11:38 GMT+08:00 Koji Kawamura <[email protected]
>>> >:
>>> >> >>> >
>>> >> >>> >> Hi Ben,
>>> >> >>> >>
>>> >> >>> >> The following two log messages are very close in terms of
>>> written
>>> >> >>> >> timestamp, but have different log level.
>>> >> >>> >> 2017-12-26 07:00:01,312 INFO
>>> >> >>> >> 2017-12-26 07:00:01,315 ERROR
>>> >> >>> >>
>>> >> >>> >> I guess those are logged within a single onTrigger of your
>>> >> >>> >> ExecuteSqlCommand custom processor, one is before executing, the
>>> >> other
>>> >> >>> >> is when it caught an exception. Just guessing as I don't have
>>> access
>>> >> >>> >> to the code.
>>> >> >>> >>
>>> >> >>> >> Does the same issue happen with other processors bundled with
>>> Apache
>>> >> >>> >> NiFi without your custom processor running?
>>> >> >>> >>
>>> >> >>> >> If NiFi fails to update/checkpoint FlowFile repository, then the
>>> >> same
>>> >> >>> >> FlowFile can be processed again after restarting NiFi.
>>> >> >>> >>
>>> >> >>> >> Thanks,
>>> >> >>> >> Koji
>>> >> >>> >>
>>> >> >>> >>
>>> >> >>> >>
>>> >> >>> >> On Wed, Dec 27, 2017 at 12:21 PM, 尹文才 <[email protected]>
>>> wrote:
>>> >> >>> >> > Thanks Koji, I will look into this article about the record
>>> model.
>>> >> >>> >> >
>>> >> >>> >> > By the way, that error I previously mentioned to you occurred
>>> >> again, I
>>> >> >>> >> > could see the sql query was executed twice in the log, this
>>> time
>>> >> I had
>>> >> >>> >> > turned on the verbose NiFi logging, the sql query is as below:
>>> >> >>> >> >
>>> >> >>> >> > 2017-12-26 07:00:01,312 INFO [Timer-Driven Process Thread-1]
>>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> >> 执行sql语句:
>>> >> >>> >> SELECT
>>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
>>> FROM
>>> >> >>> >> > dbo.ods_extractDataDebug;
>>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
>>> drop
>>> >> >>> column
>>> >> >>> >> _id;
>>> >> >>> >> >
>>> >> >>> >> > and it was executed again later:
>>> >> >>> >> >
>>> >> >>> >> > 2017-12-26 07:00:01,315 ERROR [Timer-Driven Process Thread-1]
>>> >> >>> >> > c.z.nifi.processors.ExecuteSqlCommand
>>> >> >>> >> > ExecuteSqlCommand[id=3c97dfd8-aaa4-3a37-626e-fed5a4822d14]
>>> >> >>> >> 执行sql语句失败:SELECT
>>> >> >>> >> > TOP 0 * INTO tmp.ods_extractDataDebug_20171226031801926_9195
>>> FROM
>>> >> >>> >> > dbo.ods_extractDataDebug;
>>> >> >>> >> > alter table tmp.ods_extractDataDebug_20171226031801926_9195
>>> drop
>>> >> >>> column
>>> >> >>> >> > _id;: com.microsoft.sqlserver.jdbc.SQLServerException:
>>> 数据库中已存在名为
>>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException: 数据库中已存在名为
>>> >> >>> >> > 'ods_extractDataDebug_20171226031801926_9195' 的对象。
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerException.
>>> >> >>> makeFromDatabaseError(
>>> >> >>> >> SQLServerException.java:217)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResul
>>> t(
>>> >> >>> >> SQLServerStatement.java:1655)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.
>>> >> doExecuteStatement(
>>> >> >>> >> SQLServerStatement.java:885)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement$
>>> >> >>> StmtExecCmd.doExecute(
>>> >> >>> >> SQLServerStatement.java:778)
>>> >> >>> >> > at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.
>>> >> >>> java:7505)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerConnection.executeComm
>>> and(
>>> >> >>> >> SQLServerConnection.java:2445)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeComma
>>> nd(
>>> >> >>> >> SQLServerStatement.java:191)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.executeState
>>> ment(
>>> >> >>> >> SQLServerStatement.java:166)
>>> >> >>> >> > at
>>> >> >>> >> > com.microsoft.sqlserver.jdbc.SQLServerStatement.execute(
>>> >> >>> >> SQLServerStatement.java:751)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>>> >> >>> >> DelegatingStatement.java:264)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.commons.dbcp.DelegatingStatement.execute(
>>> >> >>> >> DelegatingStatement.java:264)
>>> >> >>> >> > at
>>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>>> >> >>> >> executeSql(ExecuteSqlCommand.java:194)
>>> >> >>> >> > at
>>> >> >>> >> > com.zjrealtech.nifi.processors.ExecuteSqlCommand.
>>> >> >>> >> onTrigger(ExecuteSqlCommand.java:164)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>> >> >>> >> AbstractProcessor.java:27)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>> >> >>> >> StandardProcessorNode.java:1119)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> >> call(
>>> >> >>> >> ContinuallyRunProcessorTask.java:147)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> >> call(
>>> >> >>> >> ContinuallyRunProcessorTask.java:47)
>>> >> >>> >> > at
>>> >> >>> >> > org.apache.nifi.controller.scheduling.
>>> >> TimerDrivenSchedulingAgent$1.
>>> >> >>> run(
>>> >> >>> >> TimerDrivenSchedulingAgent.java:128)
>>> >> >>> >> > at java.util.concurrent.Executors$RunnableAdapter.
>>> >> >>> >> call(Executors.java:511)
>>> >> >>> >> > at java.util.concurrent.FutureTask.runAndReset(
>>> >> FutureTask.java:308)
>>> >> >>> >> > at
>>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>>> >> >>> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
>>> >> java:180)
>>> >> >>> >> > at
>>> >> >>> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
>>> >> >>> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> >> >>> >> > at
>>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> >> >>> >> ThreadPoolExecutor.java:1142)
>>> >> >>> >> > at
>>> >> >>> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> >> >>> >> ThreadPoolExecutor.java:617)
>>> >> >>> >> > at java.lang.Thread.run(Thread.java:745)
>>> >> >>> >> >
>>> >> >>> >> > I also saw a lot of NiFi's exception like "ProcessException:
>>> >> FlowFile
>>> >> >>> >> > Repository failed to update", not sure if this is the reason
>>> the
>>> >> >>> FlowFile
>>> >> >>> >> > got processed twice.  Could you help to take a look at my log
>>> >> file?
>>> >> >>> >> Thanks.
>>> >> >>> >> > You could get the log file via the link:
>>> >> >>> >> > https://drive.google.com/file/d/1uVgtAVNEHxAbAPEpNTOWq_
>>> >> >>> N9Xu6zMEi3/view
>>> >> >>> >> >
>>> >> >>> >> > Best Regards,
>>> >> >>> >> > Ben
>>> >> >>> >> >
>>> >> >>> >> > 2017-12-27 10:00 GMT+08:00 Koji Kawamura <
>>> [email protected]
>>> >> >:
>>> >> >>> >> >
>>> >> >>> >> >> Hi Ben,
>>> >> >>> >> >>
>>> >> >>> >> >> This blog post written by Mark, would be a good starting
>>> point
>>> >> to get
>>> >> >>> >> >> familiar with NiFi Record model.
>>> >> >>> >> >> https://blogs.apache.org/nifi/entry/record-oriented-data-
>>> >> with-nifi
>>> >> >>> >> >>
>>> >> >>> >> >> HA for DistributedMapCacheClientService and
>>> >> >>> DistributedMapCacheServer
>>> >> >>> >> >> pair is not supported at the moment. If you need
>>> >> HighAvailability,
>>> >> >>> >> >> RedisDistributedMapCacheClientService with Redis replication
>>> >> will
>>> >> >>> >> >> provide that, I haven't tried that myself though.
>>> >> >>> >> >> https://redis.io/topics/replication
>>> >> >>> >> >>
>>> >> >>> >> >> Thanks,
>>> >> >>> >> >> Koji
>>> >> >>> >> >>
>>> >> >>> >> >> On Tue, Dec 26, 2017 at 7:58 PM, 尹文才 <[email protected]>
>>> >> wrote:
>>> >> >>> >> >> > Thanks for your quick response, Koji, I haven't heard and
>>> seen
>>> >> >>> >> anything
>>> >> >>> >> >> > about the NiFi record data model when I was reading the
>>> NiFi
>>> >> >>> >> >> > documentations,could you tell me where this model is
>>> >> documented?
>>> >> >>> >> Thanks.
>>> >> >>> >> >> >
>>> >> >>> >> >> > By the way, to my knowledge, when you need to use the
>>> >> >>> >> >> DistributedMapCacheServer
>>> >> >>> >> >> > from DistributedMapCacheClientService, you need to
>>> specify the
>>> >> >>> host
>>> >> >>> >> url
>>> >> >>> >> >> for
>>> >> >>> >> >> > the server, this means inside a NiFi cluster
>>> >> >>> >> >> > when I specify the cache server and the node suddenly went
>>> >> down, I
>>> >> >>> >> >> couldn't
>>> >> >>> >> >> > possibly use it until the node goes up again right? Is
>>> there
>>> >> >>> currently
>>> >> >>> >> >> such
>>> >> >>> >> >> > a cache server in NiFi that could support HA? Thanks.
>>> >> >>> >> >> >
>>> >> >>> >> >> > Regards,
>>> >> >>> >> >> > Ben
>>> >> >>> >> >> >
>>> >> >>> >> >> > 2017-12-26 18:34 GMT+08:00 Koji Kawamura <
>>> >> [email protected]>:
>>> >> >>> >> >> >
>>> >> >>> >> >> >> Hi Ben,
>>> >> >>> >> >> >>
>>> >> >>> >> >> >> As you found from existing code, DistributedMapCache is
>>> used
>>> >> to
>>> >> >>> share
>>> >> >>> >> >> >> state among different processors, and it can be used by
>>> your
>>> >> >>> custom
>>> >> >>> >> >> >> processors, too.
>>> >> >>> >> >> >> However, I'd recommend to avoid such tight dependencies
>>> >> between
>>> >> >>> >> >> >> FlowFiles if possible, or minimize the part in flow that
>>> >> requires
>>> >> >>> >> that
>>> >> >>> >> >> >> constraint at least for better performance and simplicity.
>>> >> >>> >> >> >> For example, since a FlowFile can hold fairly large
>>> amount of
>>> >> >>> data,
>>> >> >>> >> >> >> you could merge all FlowFiles in a single FlowFile,
>>> instead of
>>> >> >>> >> batches
>>> >> >>> >> >> >> of FlowFiles. If you need logical boundaries, you can use
>>> NiFi
>>> >> >>> Record
>>> >> >>> >> >> >> data model to embed multiple records within a FlowFile,
>>> Record
>>> >> >>> should
>>> >> >>> >> >> >> perform better.
>>> >> >>> >> >> >>
>>> >> >>> >> >> >> Hope this helps.
>>> >> >>> >> >> >>
>>> >> >>> >> >> >> Thanks,
>>> >> >>> >> >> >> Koji
>>> >> >>> >> >> >>
>>> >> >>> >> >> >>
>>> >> >>> >> >> >> On Tue, Dec 26, 2017 at 5:55 PM, 尹文才 <[email protected]
>>> >
>>> >> wrote:
>>> >> >>> >> >> >> > Hi guys, I'm currently trying to find a proper way in
>>> nifi
>>> >> which
>>> >> >>> >> could
>>> >> >>> >> >> >> sync
>>> >> >>> >> >> >> > status between my custom processors.
>>> >> >>> >> >> >> > our requirement is like this, we're doing some ETL work
>>> >> using
>>> >> >>> nifi
>>> >> >>> >> and
>>> >> >>> >> >> >> I'm
>>> >> >>> >> >> >> > extracting the data from DB into batches of
>>> FlowFiles(each
>>> >> >>> batch of
>>> >> >>> >> >> >> > FlowFile has a flag FlowFile indicating the end of the
>>> >> batch).
>>> >> >>> >> >> >> > There're some groups of custom processors downstream
>>> that
>>> >> need
>>> >> >>> to
>>> >> >>> >> >> process
>>> >> >>> >> >> >> > these FlowFiles to do some business logic work. And we
>>> >> expect
>>> >> >>> these
>>> >> >>> >> >> >> > processors to process one batch of FlowFiles at a time.
>>> >> >>> >> >> >> > Therefore we need to implement a custom Wait
>>> processor(let's
>>> >> >>> just
>>> >> >>> >> >> call it
>>> >> >>> >> >> >> > WaitBatch here) to hold all the other batches of
>>> FlowFiles
>>> >> while
>>> >> >>> >> the
>>> >> >>> >> >> >> > business processors were handling the batch of FlowFiles
>>> >> whose
>>> >> >>> >> >> creation
>>> >> >>> >> >> >> > time is earlier.
>>> >> >>> >> >> >> >
>>> >> >>> >> >> >> > In order to implement this, all the WaitBatch processors
>>> >> placed
>>> >> >>> in
>>> >> >>> >> the
>>> >> >>> >> >> >> flow
>>> >> >>> >> >> >> > need to read/update records in a shared map so that each
>>> >> set of
>>> >> >>> >> >> >> > business-logic processors process one batch at a time.
>>> >> >>> >> >> >> > The entries are keyed using the batch number of the
>>> >> FlowFiles
>>> >> >>> and
>>> >> >>> >> the
>>> >> >>> >> >> >> value
>>> >> >>> >> >> >> > of each entry is a batch release counter number which
>>> >> counts the
>>> >> >>> >> >> number
>>> >> >>> >> >> >> of
>>> >> >>> >> >> >> > times the batch of FlowFiles has passed through
>>> >> >>> >> >> >> > a WaitBatch processor.
>>> >> >>> >> >> >> > When a batch is released by WaitBatch, it will try to
>>> >> increment
>>> >> >>> the
>>> >> >>> >> >> batch
>>> >> >>> >> >> >> > number entry's value by 1 and then the released batch
>>> >> number and
>>> >> >>> >> >> counter
>>> >> >>> >> >> >> > number will also be saved locally at the WaitBatch with
>>> >> >>> >> StateManager;
>>> >> >>> >> >> >> > when the next batch reaches the WaitBatch, it will
>>> check if
>>> >> the
>>> >> >>> >> >> counter
>>> >> >>> >> >> >> > value of the previous released batch number in the
>>> shared
>>> >> map is
>>> >> >>> >> >> greater
>>> >> >>> >> >> >> > than the one saved locally, if the entry for the batch
>>> >> number
>>> >> >>> >> does't
>>> >> >>> >> >> >> > exist(already removed) or the value in the shared map is
>>> >> >>> greater,
>>> >> >>> >> the
>>> >> >>> >> >> >> next
>>> >> >>> >> >> >> > batch will be released and the local state and the
>>> entry on
>>> >> the
>>> >> >>> >> shared
>>> >> >>> >> >> >> map
>>> >> >>> >> >> >> > will be updated similarly.
>>> >> >>> >> >> >> > In the end of the flow, a custom processor will get the
>>> >> batch
>>> >> >>> >> number
>>> >> >>> >> >> from
>>> >> >>> >> >> >> > each batch and remove the entry from the shared map .
>>> >> >>> >> >> >> >
>>> >> >>> >> >> >> > So this implementation requires a shared map that could
>>> >> >>> read/update
>>> >> >>> >> >> >> > frequently and atomically. I checked the Wait/Notify
>>> >> processors
>>> >> >>> in
>>> >> >>> >> >> NIFI
>>> >> >>> >> >> >> and
>>> >> >>> >> >> >> > saw it is using the DistributedMapCacheClientService
>>> and
>>> >> >>> >> >> >> > DistributedMapCacheServer to sync status, so I'm
>>> wondering
>>> >> if I
>>> >> >>> >> could
>>> >> >>> >> >> use
>>> >> >>> >> >> >> > the DistributedMapCacheClientService to implement my
>>> >> logic. I
>>> >> >>> also
>>> >> >>> >> >> saw
>>> >> >>> >> >> >> > another implementation called
>>> RedisDistributedMapCacheClient
>>> >> >>> >> Service
>>> >> >>> >> >> >> > which seems to require Redis(I haven't used Redis).
>>> Thanks
>>> >> in
>>> >> >>> >> advance
>>> >> >>> >> >> >> for
>>> >> >>> >> >> >> > any suggestions.
>>> >> >>> >> >> >> >
>>> >> >>> >> >> >> > Regards,
>>> >> >>> >> >> >> > Ben
>>> >> >>> >> >> >>
>>> >> >>> >> >>
>>> >> >>> >>
>>> >> >>>
>>> >>
>>>
>>
>>

Reply via email to