[ 
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-32374:
------------------------------
    Description: 
If the existing JSON plan is not truncated when overwriting, and the newly 
generated JSON plan contents are shorter than the previous JSON plan content, 
the plan be an invalid JSON.
h4. How to reproduce
{code:sql}
Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
'blackhole');
[INFO] Execute statement succeed.

Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
with ('connector' = 'datagen');
[INFO] Execute statement succeed.

Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select 
if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
[INFO] Execute statement succeed.

Flink SQL> set 'table.plan.force-recompile' = 'true';
[INFO] Execute statement succeed.

Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select 
* from (values (2, 'bye')) T (id, message);
[INFO] Execute statement succeed.
{code}
cat -n debug.json, and check L#67
{code:json}
     1  {
     2    "flinkVersion" : "1.17",
     3    "nodes" : [ {
     4      "id" : 15,
     5      "type" : "stream-exec-values_1",
     6      "tuples" : [ [ {
     7        "kind" : "LITERAL",
     8        "value" : "2",
     9        "type" : "INT NOT NULL"
    10      }, {
    11        "kind" : "LITERAL",
    12        "value" : "bye",
    13        "type" : "CHAR(3) NOT NULL"
    14      } ] ],
    15      "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
    16      "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
    17      "inputProperties" : [ ]
    18    }, {
    19      "id" : 16,
    20      "type" : "stream-exec-sink_1",
    21      "configuration" : {
    22        "table.exec.sink.keyed-shuffle" : "AUTO",
    23        "table.exec.sink.not-null-enforcer" : "ERROR",
    24        "table.exec.sink.type-length-enforcer" : "IGNORE",
    25        "table.exec.sink.upsert-materialize" : "AUTO"
    26      },
    27      "dynamicTableSink" : {
    28        "table" : {
    29          "identifier" : 
"`default_catalog`.`default_database`.`debug_sink`",
    30          "resolvedTable" : {
    31            "schema" : {
    32              "columns" : [ {
    33                "name" : "f0",
    34                "dataType" : "INT"
    35              }, {
    36                "name" : "f1",
    37                "dataType" : "VARCHAR(2147483647)"
    38              } ],
    39              "watermarkSpecs" : [ ]
    40            },
    41            "partitionKeys" : [ ],
    42            "options" : {
    43              "connector" : "blackhole"
    44            }
    45          }
    46        }
    47      },
    48      "inputChangelogMode" : [ "INSERT" ],
    49      "inputProperties" : [ {
    50        "requiredDistribution" : {
    51          "type" : "UNKNOWN"
    52        },
    53        "damBehavior" : "PIPELINED",
    54        "priority" : 0
    55      } ],
    56      "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
    57      "description" : 
"Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
message])"
    58    } ],
    59    "edges" : [ {
    60      "source" : 15,
    61      "target" : 16,
    62      "shuffle" : {
    63        "type" : "FORWARD"
    64      },
    65      "shuffleMode" : "PIPELINED"
    66    } ]
    67  } "$CONCAT$1",
    68        "operands" : [ {
    69          "kind" : "INPUT_REF",
    70          "inputIndex" : 2,
    71          "type" : "VARCHAR(2147483647)"
    72        }, {
    73          "kind" : "INPUT_REF",
    74          "inputIndex" : 3,
    75          "type" : "VARCHAR(2147483647)"
    76        } ],
    77        "type" : "VARCHAR(2147483647)"
    78      } ],
    79      "condition" : null,
    80      "inputProperties" : [ {
    81        "requiredDistribution" : {
    82          "type" : "UNKNOWN"
    83        },
    84        "damBehavior" : "PIPELINED",
    85        "priority" : 0
    86      } ],
    87      "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
    88      "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, 
CONCAT(f2, f3) AS f1])"
    89    }, {
    90      "id" : 14,
    91      "type" : "stream-exec-sink_1",
    92      "configuration" : {
    93        "table.exec.sink.keyed-shuffle" : "AUTO",
    94        "table.exec.sink.not-null-enforcer" : "ERROR",
    95        "table.exec.sink.type-length-enforcer" : "IGNORE",
    96        "table.exec.sink.upsert-materialize" : "AUTO"
    97      },
    98      "dynamicTableSink" : {
    99        "table" : {
   100          "identifier" : 
"`default_catalog`.`default_database`.`debug_sink`",
   101          "resolvedTable" : {
   102            "schema" : {
   103              "columns" : [ {
   104                "name" : "f0",
   105                "dataType" : "INT"
   106              }, {
   107                "name" : "f1",
   108                "dataType" : "VARCHAR(2147483647)"
   109              } ],
   110              "watermarkSpecs" : [ ]
   111            },
   112            "partitionKeys" : [ ],
   113            "options" : {
   114              "connector" : "blackhole"
   115            }
   116          }
   117        }
   118      },
   119      "inputChangelogMode" : [ "INSERT" ],
   120      "inputProperties" : [ {
   121        "requiredDistribution" : {
   122          "type" : "UNKNOWN"
   123        },
   124        "damBehavior" : "PIPELINED",
   125        "priority" : 0
   126      } ],
   127      "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
   128      "description" : 
"Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
   129    } ],
   130    "edges" : [ {
   131      "source" : 12,
   132      "target" : 13,
   133      "shuffle" : {
   134        "type" : "FORWARD"
   135      },
   136      "shuffleMode" : "PIPELINED"
   137    }, {
   138      "source" : 13,
   139      "target" : 14,
   140      "shuffle" : {
   141        "type" : "FORWARD"
   142      },
   143      "shuffleMode" : "PIPELINED"
   144    } ]
   145  }
{code}

> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
> overwriting
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-32374
>                 URL: https://issues.apache.org/jira/browse/FLINK-32374
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
>            Reporter: Jane Chan
>            Priority: Major
>             Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly 
> generated JSON plan contents are shorter than the previous JSON plan content, 
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
>      1        {
>      2          "flinkVersion" : "1.17",
>      3          "nodes" : [ {
>      4            "id" : 15,
>      5            "type" : "stream-exec-values_1",
>      6            "tuples" : [ [ {
>      7              "kind" : "LITERAL",
>      8              "value" : "2",
>      9              "type" : "INT NOT NULL"
>     10            }, {
>     11              "kind" : "LITERAL",
>     12              "value" : "bye",
>     13              "type" : "CHAR(3) NOT NULL"
>     14            } ] ],
>     15            "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
>     16            "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
>     17            "inputProperties" : [ ]
>     18          }, {
>     19            "id" : 16,
>     20            "type" : "stream-exec-sink_1",
>     21            "configuration" : {
>     22              "table.exec.sink.keyed-shuffle" : "AUTO",
>     23              "table.exec.sink.not-null-enforcer" : "ERROR",
>     24              "table.exec.sink.type-length-enforcer" : "IGNORE",
>     25              "table.exec.sink.upsert-materialize" : "AUTO"
>     26            },
>     27            "dynamicTableSink" : {
>     28              "table" : {
>     29                "identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
>     30                "resolvedTable" : {
>     31                  "schema" : {
>     32                    "columns" : [ {
>     33                      "name" : "f0",
>     34                      "dataType" : "INT"
>     35                    }, {
>     36                      "name" : "f1",
>     37                      "dataType" : "VARCHAR(2147483647)"
>     38                    } ],
>     39                    "watermarkSpecs" : [ ]
>     40                  },
>     41                  "partitionKeys" : [ ],
>     42                  "options" : {
>     43                    "connector" : "blackhole"
>     44                  }
>     45                }
>     46              }
>     47            },
>     48            "inputChangelogMode" : [ "INSERT" ],
>     49            "inputProperties" : [ {
>     50              "requiredDistribution" : {
>     51                "type" : "UNKNOWN"
>     52              },
>     53              "damBehavior" : "PIPELINED",
>     54              "priority" : 0
>     55            } ],
>     56            "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
>     57            "description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
> message])"
>     58          } ],
>     59          "edges" : [ {
>     60            "source" : 15,
>     61            "target" : 16,
>     62            "shuffle" : {
>     63              "type" : "FORWARD"
>     64            },
>     65            "shuffleMode" : "PIPELINED"
>     66          } ]
>     67        } "$CONCAT$1",
>     68              "operands" : [ {
>     69                "kind" : "INPUT_REF",
>     70                "inputIndex" : 2,
>     71                "type" : "VARCHAR(2147483647)"
>     72              }, {
>     73                "kind" : "INPUT_REF",
>     74                "inputIndex" : 3,
>     75                "type" : "VARCHAR(2147483647)"
>     76              } ],
>     77              "type" : "VARCHAR(2147483647)"
>     78            } ],
>     79            "condition" : null,
>     80            "inputProperties" : [ {
>     81              "requiredDistribution" : {
>     82                "type" : "UNKNOWN"
>     83              },
>     84              "damBehavior" : "PIPELINED",
>     85              "priority" : 0
>     86            } ],
>     87            "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>     88            "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, 
> CONCAT(f2, f3) AS f1])"
>     89          }, {
>     90            "id" : 14,
>     91            "type" : "stream-exec-sink_1",
>     92            "configuration" : {
>     93              "table.exec.sink.keyed-shuffle" : "AUTO",
>     94              "table.exec.sink.not-null-enforcer" : "ERROR",
>     95              "table.exec.sink.type-length-enforcer" : "IGNORE",
>     96              "table.exec.sink.upsert-materialize" : "AUTO"
>     97            },
>     98            "dynamicTableSink" : {
>     99              "table" : {
>    100                "identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
>    101                "resolvedTable" : {
>    102                  "schema" : {
>    103                    "columns" : [ {
>    104                      "name" : "f0",
>    105                      "dataType" : "INT"
>    106                    }, {
>    107                      "name" : "f1",
>    108                      "dataType" : "VARCHAR(2147483647)"
>    109                    } ],
>    110                    "watermarkSpecs" : [ ]
>    111                  },
>    112                  "partitionKeys" : [ ],
>    113                  "options" : {
>    114                    "connector" : "blackhole"
>    115                  }
>    116                }
>    117              }
>    118            },
>    119            "inputChangelogMode" : [ "INSERT" ],
>    120            "inputProperties" : [ {
>    121              "requiredDistribution" : {
>    122                "type" : "UNKNOWN"
>    123              },
>    124              "damBehavior" : "PIPELINED",
>    125              "priority" : 0
>    126            } ],
>    127            "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
>    128            "description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
>    129          } ],
>    130          "edges" : [ {
>    131            "source" : 12,
>    132            "target" : 13,
>    133            "shuffle" : {
>    134              "type" : "FORWARD"
>    135            },
>    136            "shuffleMode" : "PIPELINED"
>    137          }, {
>    138            "source" : 13,
>    139            "target" : 14,
>    140            "shuffle" : {
>    141              "type" : "FORWARD"
>    142            },
>    143            "shuffleMode" : "PIPELINED"
>    144          } ]
>    145        }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to