[
https://issues.apache.org/jira/browse/FLINK-29765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625522#comment-17625522
]
Aqib Mehmood commented on FLINK-29765:
--------------------------------------
{{We're using a kafka connector. Below is the configs and source table}}
{{ EnvironmentSettings settings =
EnvironmentSettings.inStreamingMode();}}
{{ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
settings);}}
{{ tEnv.executeSql("CREATE TABLE orders (\n" +}}
{{ " data ROW(\n" +}}
{{ " id INT,\n" +}}
{{ " poId INT,\n" +}}
{{ " productId INT,\n" +}}
{{ " name varchar(191),\n" +}}
{{ " quantity INT,\n" +}}
{{ " price decimal(15,4),\n" +}}
{{ " taxType varchar(191),\n" +}}
{{ " taxAmount decimal(15,4),\n" +}}
{{ " subTotalWithoutTax decimal(15,4),\n" +}}
{{ " subTotalWithTax decimal(15,4),\n" +}}
{{ " createdAt TIMESTAMP_LTZ,\n" +}}
{{ " updatedAt TIMESTAMP_LTZ,\n" +}}
{{ " sku varchar(191),\n" +}}
{{ " mrp decimal(15,4)\n" +}}
{{ ")) WITH (\n" +}}
{{ " 'connector' = 'kafka',\n" +}}
{{ " 'topic' = 'orders'"}}
{{ " 'scan.startup.mode' = 'earliest-offset',\n" +}}
{{ " 'format' = 'json',\n" +}}
{{ " 'json.timestamp-format.standard' = 'ISO-8601',\n" +}}
{{ " 'properties.security.protocol' = 'SASL_SSL',\n" +);}}
> SQL query not executing properly
> --------------------------------
>
> Key: FLINK-29765
> URL: https://issues.apache.org/jira/browse/FLINK-29765
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.15.0
> Reporter: Aqib Mehmood
> Priority: Major
>
> I am using this query to compare difference between last and second last
> price of our order sku
> WITH CTE AS (
> SELECT a.sku, a.name, a.updatedAt, b.price FROM (
> SELECT sku, name, max(updatedAt) AS updatedAt from (
> SELECT sku, name, updatedAt FROM wms.PurchaseOrderProduct
> WHERE CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s'))
> not in (
> SELECT CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d
> %H:%m:%s')) FROM (
> SELECT sku, max(updatedAt) as updatedAt from
> wms.PurchaseOrderProduct
> GROUP BY sku
> ) AS x
> )
> ) AS z
> GROUP BY sku, name
> ) AS a
> LEFT JOIN wms.PurchaseOrderProduct b
> ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
> )
> SELECT a.sku, a.name, a.updatedAt AS latestupdatedAt, a.price AS latestPrice,
> b.updatedAt AS lastUpdatedAt, b.price AS lastPrice
> FROM (
> SELECT a.sku, a.name, a.updatedAt, b.price from (
> SELECT sku, name, max(updatedAt) as updatedAt from
> wms.PurchaseOrderProduct
> GROUP BY sku, name
> ) AS a
> LEFT JOIN wms.PurchaseOrderProduct b
> ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
> ) AS a
> LEFT JOIN CTE AS b
> ON a.sku=b.sku AND a.name=b.name;
> This issue is that Im getting *NULLs* for columns *lastUpdatedAt* and
> {*}lastPrice{*}. But when I run the same query on our prod database, I'm
> getting desired results.I suspect that flink is not processing the entire
> query before giving the results.
> I get desired results for a couple of rows in while *lastUpdatedAt* and
> *lastPrice* are not *NULL* in the beginning of the table{*}.{*} But then
> after that the entire two columns return *NULLs*
> I would like to know why flink is not executing the above query properly?TIA
--
This message was sent by Atlassian Jira
(v8.20.10#820010)