[
https://issues.apache.org/jira/browse/NIFI-11671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Philipp Korniets updated NIFI-11671:
------------------------------------
Description:
We use ForkEnrichement - JoinEnrichment pattern and want to include filtering
in join SQL. Filter value is coming from FlowFile attribute
{code:sql}
${test} = 'NewValue'
SELECT original.*, enrichment.*,'${test}'
FROM original
LEFT OUTER JOIN enrichment
ON original.Underlying = enrichment.Underlying
WHERE enrichment.MyField = '${test}'
{code}
However this doesnt work because JoinEnrichment doesnt use
evaluateAttributeExpressions
Additionally in version 1.18 - doesnt allow whole query to be passed as
attribute.
!screenshot-1.png|width=692,height=431!
{code:java}
2023-06-28 11:07:16,611 ERROR [Timer-Driven Process Thread-7]
o.a.n.processors.standard.JoinEnrichment
JoinEnrichment[id=dbe156ac-0187-1000-4477-0183899e0432] Failed to join
'original' FlowFile
StandardFlowFileRecord[uuid=2ab9f6ad-73a5-4763-b25e-fd26c44835e1,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1687948831976-629, container=default,
section=629], offset=8334082,
length=600557],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=600557]
and 'enrichment' FlowFile
StandardFlowFileRecord[uuid=e4bb7769-fdce-4dfe-af18-443676103035,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1687949723375-631, container=default,
section=631], offset=5362822,
length=1999502],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=1999502];
routing to failure
java.sql.SQLException: Error while preparing statement [${instrumentJoinSQL}]
at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
at
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:224)
at
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:203)
at
org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:99)
at
org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:178)
at
org.apache.nifi.processors.standard.enrichment.SqlJoinCache.createCalciteParameters(SqlJoinCache.java:91)
at
org.apache.nifi.processors.standard.enrichment.SqlJoinCache.getCalciteParameters(SqlJoinCache.java:65)
at
org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy.join(SqlJoinStrategy.java:49)
at
org.apache.nifi.processors.standard.JoinEnrichment.processBin(JoinEnrichment.java:387)
at
org.apache.nifi.processor.util.bin.BinFiles.processBins(BinFiles.java:233)
at
org.apache.nifi.processors.standard.JoinEnrichment.processBins(JoinEnrichment.java:503)
at org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:193)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: parse failed: Encountered "$" at line 1,
column 1.
Was expecting one of:
"ABS" ...
{code}
was:
We use ForkEnrichement - JoinEnrichment pattern and want to include filtering
in join SQL. Filter value is coming from FlowFile attribute
{code:sql}
${test} = 'NewValue'
SELECT original.*, enrichment.*,'${test}'
FROM original
LEFT OUTER JOIN enrichment
ON original.Underlying = enrichment.Underlying
WHERE enrichment.MyField = '${test}'
{code}
However this doesnt work because JoinEnrichment doesnt use
evaluateAttributeExpressions
Additionally in version 1.18 - doesnt allow whole query to be passed as
attribute.
!screenshot-1.png|width=692,height=431!
> JoinEnrichment SQL strategy doesn't allow attributes in join statement
> ----------------------------------------------------------------------
>
> Key: NIFI-11671
> URL: https://issues.apache.org/jira/browse/NIFI-11671
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.18.0, 1.20.0
> Reporter: Philipp Korniets
> Priority: Major
> Attachments: screenshot-1.png
>
>
> We use ForkEnrichement - JoinEnrichment pattern and want to include filtering
> in join SQL. Filter value is coming from FlowFile attribute
> {code:sql}
> ${test} = 'NewValue'
> SELECT original.*, enrichment.*,'${test}'
> FROM original
> LEFT OUTER JOIN enrichment
> ON original.Underlying = enrichment.Underlying
> WHERE enrichment.MyField = '${test}'
> {code}
> However this doesnt work because JoinEnrichment doesnt use
> evaluateAttributeExpressions
> Additionally in version 1.18 - doesnt allow whole query to be passed as
> attribute.
> !screenshot-1.png|width=692,height=431!
>
> {code:java}
> 2023-06-28 11:07:16,611 ERROR [Timer-Driven Process Thread-7]
> o.a.n.processors.standard.JoinEnrichment
> JoinEnrichment[id=dbe156ac-0187-1000-4477-0183899e0432] Failed to join
> 'original' FlowFile
> StandardFlowFileRecord[uuid=2ab9f6ad-73a5-4763-b25e-fd26c44835e1,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1687948831976-629,
> container=default, section=629], offset=8334082,
> length=600557],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=600557]
> and 'enrichment' FlowFile
> StandardFlowFileRecord[uuid=e4bb7769-fdce-4dfe-af18-443676103035,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1687949723375-631,
> container=default, section=631], offset=5362822,
> length=1999502],offset=0,name=lmr_SY08C41-1_S_514682_20230627.csv,size=1999502];
> routing to failure
> java.sql.SQLException: Error while preparing statement [${instrumentJoinSQL}]
> at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
> at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
> at
> org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement_(CalciteConnectionImpl.java:224)
> at
> org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:203)
> at
> org.apache.calcite.jdbc.CalciteConnectionImpl.prepareStatement(CalciteConnectionImpl.java:99)
> at
> org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:178)
> at
> org.apache.nifi.processors.standard.enrichment.SqlJoinCache.createCalciteParameters(SqlJoinCache.java:91)
> at
> org.apache.nifi.processors.standard.enrichment.SqlJoinCache.getCalciteParameters(SqlJoinCache.java:65)
> at
> org.apache.nifi.processors.standard.enrichment.SqlJoinStrategy.join(SqlJoinStrategy.java:49)
> at
> org.apache.nifi.processors.standard.JoinEnrichment.processBin(JoinEnrichment.java:387)
> at
> org.apache.nifi.processor.util.bin.BinFiles.processBins(BinFiles.java:233)
> at
> org.apache.nifi.processors.standard.JoinEnrichment.processBins(JoinEnrichment.java:503)
> at
> org.apache.nifi.processor.util.bin.BinFiles.onTrigger(BinFiles.java:193)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.RuntimeException: parse failed: Encountered "$" at line
> 1, column 1.
> Was expecting one of:
> "ABS" ...
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)