[ 
https://issues.apache.org/jira/browse/FLINK-29765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17625522#comment-17625522
 ] 

Aqib Mehmood commented on FLINK-29765:
--------------------------------------

{{We're using a kafka connector. Below is the configs and source table}}


{{        EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();}}
{{        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);}}
{{        tEnv.executeSql("CREATE TABLE orders (\n" +}}
{{            "    data ROW(\n" +}}
{{            "    id INT,\n" +}}
{{            "    poId INT,\n" +}}
{{            "    productId INT,\n" +}}
{{            "    name varchar(191),\n" +}}
{{            "    quantity  INT,\n" +}}
{{            "    price decimal(15,4),\n" +}}
{{            "    taxType varchar(191),\n" +}}
{{            "    taxAmount decimal(15,4),\n" +}}
{{            "    subTotalWithoutTax  decimal(15,4),\n" +}}
{{            "    subTotalWithTax  decimal(15,4),\n" +}}
{{            "    createdAt TIMESTAMP_LTZ,\n" +}}
{{            "    updatedAt TIMESTAMP_LTZ,\n" +}}
{{            "    sku varchar(191),\n" +}}
{{            "    mrp decimal(15,4)\n" +}}
{{            ")) WITH (\n" +}}
{{            "    'connector' = 'kafka',\n" +}}
{{            "    'topic'     = 'orders'"}}
{{            "    'scan.startup.mode'    = 'earliest-offset',\n" +}}
{{            "    'format'    = 'json',\n" +}}
{{            "    'json.timestamp-format.standard' = 'ISO-8601',\n" +}}
{{            "    'properties.security.protocol' = 'SASL_SSL',\n" +);}}

> SQL query not executing properly
> --------------------------------
>
>                 Key: FLINK-29765
>                 URL: https://issues.apache.org/jira/browse/FLINK-29765
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.15.0
>            Reporter: Aqib Mehmood
>            Priority: Major
>
> I am using this query to compare difference between last and second last 
> price of our order sku
> WITH CTE AS (
>     SELECT a.sku, a.name, a.updatedAt, b.price FROM (    
>         SELECT sku, name, max(updatedAt) AS updatedAt from (
>             SELECT sku, name, updatedAt FROM wms.PurchaseOrderProduct
>             WHERE CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s')) 
> not in (
>                 SELECT CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d 
> %H:%m:%s')) FROM (
>                     SELECT sku, max(updatedAt) as updatedAt from 
> wms.PurchaseOrderProduct
>                     GROUP BY sku
>                 ) AS x
>             )
>         ) AS z
>         GROUP BY sku, name
>     ) AS a
>     LEFT JOIN wms.PurchaseOrderProduct b
>     ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
> )
> SELECT a.sku, a.name, a.updatedAt AS latestupdatedAt, a.price AS latestPrice, 
> b.updatedAt AS lastUpdatedAt, b.price AS lastPrice
> FROM (
>     SELECT a.sku, a.name, a.updatedAt, b.price from (
>         SELECT sku, name, max(updatedAt) as updatedAt from 
> wms.PurchaseOrderProduct
>         GROUP BY sku, name
>     ) AS a
>     LEFT JOIN wms.PurchaseOrderProduct b
>     ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
> ) AS a
> LEFT JOIN CTE AS b
> ON a.sku=b.sku AND a.name=b.name;
> This issue is that Im getting *NULLs* for columns *lastUpdatedAt* and 
> {*}lastPrice{*}. But when I run the same query on our prod database, I'm 
> getting desired results.I suspect that flink is not processing the entire 
> query before giving the results.
> I get desired results for a couple of rows in while *lastUpdatedAt* and 
> *lastPrice* are not *NULL* in the beginning of the table{*}.{*} But then 
> after that the entire two columns return *NULLs*
> I would like to know why flink is not executing the above query properly?TIA



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to