[
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-32374:
-----------------------------------
Labels: pull-request-available (was: )
> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for
> overwriting
> --------------------------------------------------------------------------------------
>
> Key: FLINK-32374
> URL: https://issues.apache.org/jira/browse/FLINK-32374
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
> Reporter: Jane Chan
> Assignee: Jane Chan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly
> generated JSON plan contents are shorter than the previous JSON plan content,
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' =
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string)
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
> 1 {
> 2 "flinkVersion" : "1.17",
> 3 "nodes" : [ {
> 4 "id" : 15,
> 5 "type" : "stream-exec-values_1",
> 6 "tuples" : [ [ {
> 7 "kind" : "LITERAL",
> 8 "value" : "2",
> 9 "type" : "INT NOT NULL"
> 10 }, {
> 11 "kind" : "LITERAL",
> 12 "value" : "bye",
> 13 "type" : "CHAR(3) NOT NULL"
> 14 } ] ],
> 15 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3)
> NOT NULL>",
> 16 "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
> 17 "inputProperties" : [ ]
> 18 }, {
> 19 "id" : 16,
> 20 "type" : "stream-exec-sink_1",
> 21 "configuration" : {
> 22 "table.exec.sink.keyed-shuffle" : "AUTO",
> 23 "table.exec.sink.not-null-enforcer" : "ERROR",
> 24 "table.exec.sink.type-length-enforcer" : "IGNORE",
> 25 "table.exec.sink.upsert-materialize" : "AUTO"
> 26 },
> 27 "dynamicTableSink" : {
> 28 "table" : {
> 29 "identifier" :
> "`default_catalog`.`default_database`.`debug_sink`",
> 30 "resolvedTable" : {
> 31 "schema" : {
> 32 "columns" : [ {
> 33 "name" : "f0",
> 34 "dataType" : "INT"
> 35 }, {
> 36 "name" : "f1",
> 37 "dataType" : "VARCHAR(2147483647)"
> 38 } ],
> 39 "watermarkSpecs" : [ ]
> 40 },
> 41 "partitionKeys" : [ ],
> 42 "options" : {
> 43 "connector" : "blackhole"
> 44 }
> 45 }
> 46 }
> 47 },
> 48 "inputChangelogMode" : [ "INSERT" ],
> 49 "inputProperties" : [ {
> 50 "requiredDistribution" : {
> 51 "type" : "UNKNOWN"
> 52 },
> 53 "damBehavior" : "PIPELINED",
> 54 "priority" : 0
> 55 } ],
> 56 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3)
> NOT NULL>",
> 57 "description" :
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id,
> message])"
> 58 } ],
> 59 "edges" : [ {
> 60 "source" : 15,
> 61 "target" : 16,
> 62 "shuffle" : {
> 63 "type" : "FORWARD"
> 64 },
> 65 "shuffleMode" : "PIPELINED"
> 66 } ]
> 67 } "$CONCAT$1",
> 68 "operands" : [ {
> 69 "kind" : "INPUT_REF",
> 70 "inputIndex" : 2,
> 71 "type" : "VARCHAR(2147483647)"
> 72 }, {
> 73 "kind" : "INPUT_REF",
> 74 "inputIndex" : 3,
> 75 "type" : "VARCHAR(2147483647)"
> 76 } ],
> 77 "type" : "VARCHAR(2147483647)"
> 78 } ],
> 79 "condition" : null,
> 80 "inputProperties" : [ {
> 81 "requiredDistribution" : {
> 82 "type" : "UNKNOWN"
> 83 },
> 84 "damBehavior" : "PIPELINED",
> 85 "priority" : 0
> 86 } ],
> 87 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> 88 "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0,
> CONCAT(f2, f3) AS f1])"
> 89 }, {
> 90 "id" : 14,
> 91 "type" : "stream-exec-sink_1",
> 92 "configuration" : {
> 93 "table.exec.sink.keyed-shuffle" : "AUTO",
> 94 "table.exec.sink.not-null-enforcer" : "ERROR",
> 95 "table.exec.sink.type-length-enforcer" : "IGNORE",
> 96 "table.exec.sink.upsert-materialize" : "AUTO"
> 97 },
> 98 "dynamicTableSink" : {
> 99 "table" : {
> 100 "identifier" :
> "`default_catalog`.`default_database`.`debug_sink`",
> 101 "resolvedTable" : {
> 102 "schema" : {
> 103 "columns" : [ {
> 104 "name" : "f0",
> 105 "dataType" : "INT"
> 106 }, {
> 107 "name" : "f1",
> 108 "dataType" : "VARCHAR(2147483647)"
> 109 } ],
> 110 "watermarkSpecs" : [ ]
> 111 },
> 112 "partitionKeys" : [ ],
> 113 "options" : {
> 114 "connector" : "blackhole"
> 115 }
> 116 }
> 117 }
> 118 },
> 119 "inputChangelogMode" : [ "INSERT" ],
> 120 "inputProperties" : [ {
> 121 "requiredDistribution" : {
> 122 "type" : "UNKNOWN"
> 123 },
> 124 "damBehavior" : "PIPELINED",
> 125 "priority" : 0
> 126 } ],
> 127 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
> 128 "description" :
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])"
> 129 } ],
> 130 "edges" : [ {
> 131 "source" : 12,
> 132 "target" : 13,
> 133 "shuffle" : {
> 134 "type" : "FORWARD"
> 135 },
> 136 "shuffleMode" : "PIPELINED"
> 137 }, {
> 138 "source" : 13,
> 139 "target" : 14,
> 140 "shuffle" : {
> 141 "type" : "FORWARD"
> 142 },
> 143 "shuffleMode" : "PIPELINED"
> 144 } ]
> 145 }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)