[
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jane Chan updated FLINK-32374:
------------------------------
Description:
If the existing JSON plan is not truncated when overwriting, and the newly
generated JSON plan contents are shorter than the previous JSON plan content,
the plan be an invalid JSON.
h4. How to reproduce
{code:sql}
Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' =
'blackhole');
[INFO] Execute statement succeed.
Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string)
with ('connector' = 'datagen');
[INFO] Execute statement succeed.
Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select
if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
[INFO] Execute statement succeed.
Flink SQL> set 'table.plan.force-recompile' = 'true';
[INFO] Execute statement succeed.
Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select
* from (values (2, 'bye')) T (id, message);
[INFO] Execute statement succeed.
{code}
cat -n debug.json, and check L#67
{code:json}
1 {
2 "flinkVersion" : "1.17",
3 "nodes" : [ {
4 "id" : 15,
5 "type" : "stream-exec-values_1",
6 "tuples" : [ [ {
7 "kind" : "LITERAL",
8 "value" : "2",
9 "type" : "INT NOT NULL"
10 }, {
11 "kind" : "LITERAL",
12 "value" : "bye",
13 "type" : "CHAR(3) NOT NULL"
14 } ] ],
15 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
16 "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
17 "inputProperties" : [ ]
18 }, {
19 "id" : 16,
20 "type" : "stream-exec-sink_1",
21 "configuration" : {
22 "table.exec.sink.keyed-shuffle" : "AUTO",
23 "table.exec.sink.not-null-enforcer" : "ERROR",
24 "table.exec.sink.type-length-enforcer" : "IGNORE",
25 "table.exec.sink.upsert-materialize" : "AUTO"
26 },
27 "dynamicTableSink" : {
28 "table" : {
29 "identifier" :
"`default_catalog`.`default_database`.`debug_sink`",
30 "resolvedTable" : {
31 "schema" : {
32 "columns" : [ {
33 "name" : "f0",
34 "dataType" : "INT"
35 }, {
36 "name" : "f1",
37 "dataType" : "VARCHAR(2147483647)"
38 } ],
39 "watermarkSpecs" : [ ]
40 },
41 "partitionKeys" : [ ],
42 "options" : {
43 "connector" : "blackhole"
44 }
45 }
46 }
47 },
48 "inputChangelogMode" : [ "INSERT" ],
49 "inputProperties" : [ {
50 "requiredDistribution" : {
51 "type" : "UNKNOWN"
52 },
53 "damBehavior" : "PIPELINED",
54 "priority" : 0
55 } ],
56 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
57 "description" :
"Sink(table=[default_catalog.default_database.debug_sink], fields=[id,
message])"
58 } ],
59 "edges" : [ {
60 "source" : 15,
61 "target" : 16,
62 "shuffle" : {
63 "type" : "FORWARD"
64 },
65 "shuffleMode" : "PIPELINED"
66 } ]
67 } "$CONCAT$1",
68 "operands" : [ {
69 "kind" : "INPUT_REF",
70 "inputIndex" : 2,
71 "type" : "VARCHAR(2147483647)"
72 }, {
73 "kind" : "INPUT_REF",
74 "inputIndex" : 3,
75 "type" : "VARCHAR(2147483647)"
76 } ],
77 "type" : "VARCHAR(2147483647)"
78 } ],
79 "condition" : null,
80 "inputProperties" : [ {
81 "requiredDistribution" : {
82 "type" : "UNKNOWN"
83 },
84 "damBehavior" : "PIPELINED",
85 "priority" : 0
86 } ],
87 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
88 "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0,
CONCAT(f2, f3) AS f1])"
89 }, {
90 "id" : 14,
91 "type" : "stream-exec-sink_1",
92 "configuration" : {
93 "table.exec.sink.keyed-shuffle" : "AUTO",
94 "table.exec.sink.not-null-enforcer" : "ERROR",
95 "table.exec.sink.type-length-enforcer" : "IGNORE",
96 "table.exec.sink.upsert-materialize" : "AUTO"
97 },
98 "dynamicTableSink" : {
99 "table" : {
100 "identifier" :
"`default_catalog`.`default_database`.`debug_sink`",
101 "resolvedTable" : {
102 "schema" : {
103 "columns" : [ {
104 "name" : "f0",
105 "dataType" : "INT"
106 }, {
107 "name" : "f1",
108 "dataType" : "VARCHAR(2147483647)"
109 } ],
110 "watermarkSpecs" : [ ]
111 },
112 "partitionKeys" : [ ],
113 "options" : {
114 "connector" : "blackhole"
115 }
116 }
117 }
118 },
119 "inputChangelogMode" : [ "INSERT" ],
120 "inputProperties" : [ {
121 "requiredDistribution" : {
122 "type" : "UNKNOWN"
123 },
124 "damBehavior" : "PIPELINED",
125 "priority" : 0
126 } ],
127 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
128 "description" :
"Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
129 } ],
130 "edges" : [ {
131 "source" : 12,
132 "target" : 13,
133 "shuffle" : {
134 "type" : "FORWARD"
135 },
136 "shuffleMode" : "PIPELINED"
137 }, {
138 "source" : 13,
139 "target" : 14,
140 "shuffle" : {
141 "type" : "FORWARD"
142 },
143 "shuffleMode" : "PIPELINED"
144 } ]
145 }
{code}
> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for
> overwriting
> --------------------------------------------------------------------------------------
>
> Key: FLINK-32374
> URL: https://issues.apache.org/jira/browse/FLINK-32374
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
> Reporter: Jane Chan
> Priority: Major
> Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly
> generated JSON plan contents are shorter than the previous JSON plan content,
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' =
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string)
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
> 1 {
> 2 "flinkVersion" : "1.17",
> 3 "nodes" : [ {
> 4 "id" : 15,
> 5 "type" : "stream-exec-values_1",
> 6 "tuples" : [ [ {
> 7 "kind" : "LITERAL",
> 8 "value" : "2",
> 9 "type" : "INT NOT NULL"
> 10 }, {
> 11 "kind" : "LITERAL",
> 12 "value" : "bye",
> 13 "type" : "CHAR(3) NOT NULL"
> 14 } ] ],
> 15 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3)
> NOT NULL>",
> 16 "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
> 17 "inputProperties" : [ ]
> 18 }, {
> 19 "id" : 16,
> 20 "type" : "stream-exec-sink_1",
> 21 "configuration" : {
> 22 "table.exec.sink.keyed-shuffle" : "AUTO",
> 23 "table.exec.sink.not-null-enforcer" : "ERROR",
> 24 "table.exec.sink.type-length-enforcer" : "IGNORE",
> 25 "table.exec.sink.upsert-materialize" : "AUTO"
> 26 },
> 27 "dynamicTableSink" : {
> 28 "table" : {
> 29 "identifier" :
> "`default_catalog`.`default_database`.`debug_sink`",
> 30 "resolvedTable" : {
> 31 "schema" : {
> 32 "columns" : [ {
> 33 "name" : "f0",
> 34 "dataType" : "INT"
> 35 }, {
> 36 "name" : "f1",
> 37 "dataType" : "VARCHAR(2147483647)"
> 38 } ],
> 39 "watermarkSpecs" : [ ]
> 40 },
> 41 "partitionKeys" : [ ],
> 42 "options" : {
> 43 "connector" : "blackhole"
> 44 }
> 45 }
> 46 }
> 47 },
> 48 "inputChangelogMode" : [ "INSERT" ],
> 49 "inputProperties" : [ {
> 50 "requiredDistribution" : {
> 51 "type" : "UNKNOWN"
> 52 },
> 53 "damBehavior" : "PIPELINED",
> 54 "priority" : 0
> 55 } ],
> 56 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3)
> NOT NULL>",
> 57 "description" :
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id,
> message])"
> 58 } ],
> 59 "edges" : [ {
> 60 "source" : 15,
> 61 "target" : 16,
> 62 "shuffle" : {
> 63 "type" : "FORWARD"
> 64 },
> 65 "shuffleMode" : "PIPELINED"
> 66 } ]
> 67 } "$CONCAT$1",
> 68 "operands" : [ {
> 69 "kind" : "INPUT_REF",
> 70 "inputIndex" : 2,
> 71 "type" : "VARCHAR(2147483647)"
> 72 }, {
> 73 "kind" : "INPUT_REF",
> 74 "inputIndex" : 3,
> 75 "type" : "VARCHAR(2147483647)"
> 76 } ],
> 77 "type" : "VARCHAR(2147483647)"
> 78 } ],
> 79 "condition" : null,
> 80 "inputProperties" : [ {
> 81 "requiredDistribution" : {
> 82 "type" : "UNKNOWN"
> 83 },
> 84 "damBehavior" : "PIPELINED",
> 85 "priority" : 0
> 86 } ],
> 87 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> 88 "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0,
> CONCAT(f2, f3) AS f1])"
> 89 }, {
> 90 "id" : 14,
> 91 "type" : "stream-exec-sink_1",
> 92 "configuration" : {
> 93 "table.exec.sink.keyed-shuffle" : "AUTO",
> 94 "table.exec.sink.not-null-enforcer" : "ERROR",
> 95 "table.exec.sink.type-length-enforcer" : "IGNORE",
> 96 "table.exec.sink.upsert-materialize" : "AUTO"
> 97 },
> 98 "dynamicTableSink" : {
> 99 "table" : {
> 100 "identifier" :
> "`default_catalog`.`default_database`.`debug_sink`",
> 101 "resolvedTable" : {
> 102 "schema" : {
> 103 "columns" : [ {
> 104 "name" : "f0",
> 105 "dataType" : "INT"
> 106 }, {
> 107 "name" : "f1",
> 108 "dataType" : "VARCHAR(2147483647)"
> 109 } ],
> 110 "watermarkSpecs" : [ ]
> 111 },
> 112 "partitionKeys" : [ ],
> 113 "options" : {
> 114 "connector" : "blackhole"
> 115 }
> 116 }
> 117 }
> 118 },
> 119 "inputChangelogMode" : [ "INSERT" ],
> 120 "inputProperties" : [ {
> 121 "requiredDistribution" : {
> 122 "type" : "UNKNOWN"
> 123 },
> 124 "damBehavior" : "PIPELINED",
> 125 "priority" : 0
> 126 } ],
> 127 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> 128 "description" :
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
> 129 } ],
> 130 "edges" : [ {
> 131 "source" : 12,
> 132 "target" : 13,
> 133 "shuffle" : {
> 134 "type" : "FORWARD"
> 135 },
> 136 "shuffleMode" : "PIPELINED"
> 137 }, {
> 138 "source" : 13,
> 139 "target" : 14,
> 140 "shuffle" : {
> 141 "type" : "FORWARD"
> 142 },
> 143 "shuffleMode" : "PIPELINED"
> 144 } ]
> 145 }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)