[
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-22437:
-----------------------------------
Labels: pull-request-available stale-assigned (was: pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issue is assigned but has not
received an update in 14, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a
comment updating the community on your progress. If this issue is waiting on
feedback, please consider this a reminder to the committer/reviewer. Flink is a
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone
else may work on it. If the "warning_label" label is not removed in 7 days, the
issue will be automatically unassigned.
> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
> Key: FLINK-22437
> URL: https://issues.apache.org/jira/browse/FLINK-22437
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.2
> Reporter: zoucao
> Assignee: zoucao
> Priority: Major
> Labels: pull-request-available, stale-assigned
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be
> used, and it close the last partiton if a new record does not belong to the
> existing partition. The phenomenon occurred if there are more than one
> partiton's records are sent to filesystem sink at the same time. Hive source
> can determine parallesim by the number of file and partition, and the
> parallesim will extended by sort operator, but in
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add
> to support `SinkNotNullEnforcer`, there is no parallesim set for it, so
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
> 'type'='hive',
> 'property-version'='1',
> 'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> -- `timestamp` BIGINT,
> -- `time` STRING,
> -- id BIGINT,
> -- product STRING,
> -- price DOUBLE,
> -- canSell STRING,
> -- selledNum BIGINT
> -- ) PARTITIONED BY (
> -- dt STRING,
> -- `hour` STRING,
> -- `min` STRING
> -- ) TBLPROPERTIES (
> -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> -- 'sink.partition-commit.trigger'='partition-time',
> -- 'sink.partition-commit.delay'='1 min',
> -- 'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
> `timestamp` BIGINT,
> `time` STRING,
> id BIGINT,
> product STRING,
> price DOUBLE,
> canSell STRING,
> selledNum BIGINT,
> dt STRING,
> `hour` STRING,
> `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
> 'connector'='filesystem',
> 'path'='hdfs://XXXX',
> 'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by adding a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
> .setParallelism(inputTransformation.getParallelism)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)