[
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330904#comment-17330904
]
zoucao commented on FLINK-22437:
--------------------------------
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.fs_sink],
fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour,
min]) +- LogicalProject(timestamp=[$0], time=[$1], id=[$2], product=[$3],
price=[$4], cansell=[$5], sellednum=[$6], dt=[$7], hour=[$8], min=[$9]) +-
LogicalTableScan(table=[[myHive, flink_sql_online_test, hive_sink]]) ==
Optimized Logical Plan ==
Sink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time,
id, product, price, cansell, sellednum, dt, hour, min]) +- Sort(orderBy=[dt
ASC, hour ASC, min ASC]) +- TableSourceScan(table=[[myHive,
flink_sql_online_test, hive_sink]], fields=[timestamp, time, id, product,
price, cansell, sellednum, dt, hour, min]) == Physical Execution Plan == Stage
1 : Data Source content : Source: HiveSource-flink_sql_online_test.hive_sink
Stage 2 : Operator content : Sort(orderBy=[dt ASC, hour ASC, min ASC])
ship_strategy : FORWARD Stage 3 : Operator content : Filter ship_strategy :
REBALANCE Stage 4 : Data Sink content : Sink: Unnamed ship_strategy : FORWARD
> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
> Key: FLINK-22437
> URL: https://issues.apache.org/jira/browse/FLINK-22437
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.2
> Reporter: zoucao
> Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be
> used, and it close the last partiton if a new record does not belong to the
> existing partition. The phenomenon occurred if there are more than one
> partiton' records are sent to filesystem sink at the same time. Hive source
> can determine parallesim by the number of file and partiton, and the
> parallesim will extended by sort operator, but in
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add
> to support `SinkNotNullEnforcer`, There is no parallesim set for it, so
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
> 'type'='hive',
> 'property-version'='1',
> 'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> -- `timestamp` BIGINT,
> -- `time` STRING,
> -- id BIGINT,
> -- product STRING,
> -- price DOUBLE,
> -- canSell STRING,
> -- selledNum BIGINT
> -- ) PARTITIONED BY (
> -- dt STRING,
> -- `hour` STRING,
> -- `min` STRING
> -- ) TBLPROPERTIES (
> -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> -- 'sink.partition-commit.trigger'='partition-time',
> -- 'sink.partition-commit.delay'='1 min',
> -- 'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
> `timestamp` BIGINT,
> `time` STRING,
> id BIGINT,
> product STRING,
> price DOUBLE,
> canSell STRING,
> selledNum BIGINT,
> dt STRING,
> `hour` STRING,
> `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
> 'connector'='filesystem',
> 'path'='hdfs://XXXX',
> 'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by add a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
> .setParallelism(inputTransformation.getParallelism)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)