[
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348093#comment-17348093
]
zoucao commented on FLINK-22437:
--------------------------------
hi [~wenlong.lwl], this problem had been fixed in 1.13 beacuse the related code
is refactored and hard to back port,but it is really a problem in 1.12. If it
doesn't cost much, we'd better fix it ?
> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
> Key: FLINK-22437
> URL: https://issues.apache.org/jira/browse/FLINK-22437
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.2
> Reporter: zoucao
> Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be
> used, and it close the last partiton if a new record does not belong to the
> existing partition. The phenomenon occurred if there are more than one
> partiton's records are sent to filesystem sink at the same time. Hive source
> can determine parallesim by the number of file and partition, and the
> parallesim will extended by sort operator, but in
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add
> to support `SinkNotNullEnforcer`, there is no parallesim set for it, so
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
> 'type'='hive',
> 'property-version'='1',
> 'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> -- `timestamp` BIGINT,
> -- `time` STRING,
> -- id BIGINT,
> -- product STRING,
> -- price DOUBLE,
> -- canSell STRING,
> -- selledNum BIGINT
> -- ) PARTITIONED BY (
> -- dt STRING,
> -- `hour` STRING,
> -- `min` STRING
> -- ) TBLPROPERTIES (
> -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> -- 'sink.partition-commit.trigger'='partition-time',
> -- 'sink.partition-commit.delay'='1 min',
> -- 'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
> `timestamp` BIGINT,
> `time` STRING,
> id BIGINT,
> product STRING,
> price DOUBLE,
> canSell STRING,
> selledNum BIGINT,
> dt STRING,
> `hour` STRING,
> `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
> 'connector'='filesystem',
> 'path'='hdfs://XXXX',
> 'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by adding a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
> .setParallelism(inputTransformation.getParallelism)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)