[
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zoucao updated FLINK-22437:
---------------------------
Description:
when I execute batch sql as follow in flink-1.12.2, I found lots of small files
in hdfs. In filesystem connector, `GroupedPartitionWriter` will be used, and it
close the last partiton if a new record does not belong to the existing
partition. The phenomenon occurred if there are more than one partiton' records
are sent to filesystem sink at the same time. Hive source can determine
parallesim by the number of file and partiton, and the parallesim will extended
by sort operator, but in `CommonPhysicalSink#createSinkTransformation`,a
filter operator will be add to support `SinkNotNullEnforcer`, There is no
parallesim set for it, so filesystem sink operator can not get the correct
parallesim from inputstream.
{code:java}
CREATE CATALOG myHive with (
'type'='hive',
'property-version'='1',
'default-database' = 'flink_sql_online_test'
);
-- SET table.sql-dialect=hive;
-- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
-- `timestamp` BIGINT,
-- `time` STRING,
-- id BIGINT,
-- product STRING,
-- price DOUBLE,
-- canSell STRING,
-- selledNum BIGINT
-- ) PARTITIONED BY (
-- dt STRING,
-- `hour` STRING,
-- `min` STRING
-- ) TBLPROPERTIES (
-- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
-- 'sink.partition-commit.trigger'='partition-time',
-- 'sink.partition-commit.delay'='1 min',
-- 'sink.partition-commit.policy.kind'='metastore,success-file'
-- );
create table fs_sink (
`timestamp` BIGINT,
`time` STRING,
id BIGINT,
product STRING,
price DOUBLE,
canSell STRING,
selledNum BIGINT,
dt STRING,
`hour` STRING,
`min` STRING
) PARTITIONED BY (dt, `hour`, `min`) with (
'connector'='filesystem',
'path'='hdfs://XXXX',
'format'='csv'
);
insert into fs_sink
select * from myHive.flink_sql_online_test.hive_sink;
{code}
I think this problem can be fixed by adding a parallesim for it just like
{code:java}
val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
.setParallelism(inputTransformation.getParallelism)
{code}
was:
when I execute batch sql as follow in flink-1.12.2, I found lots of small files
in hdfs. In filesystem connector, `GroupedPartitionWriter` will be used, and it
close the last partiton if a new record does not belong to the existing
partition. The phenomenon occurred if there are more than one partiton' records
are sent to filesystem sink at the same time. Hive source can determine
parallesim by the number of file and partiton, and the parallesim will extended
by sort operator, but in `CommonPhysicalSink#createSinkTransformation`,a
filter operator will be add to support `SinkNotNullEnforcer`, There is no
parallesim set for it, so filesystem sink operator can not get the correct
parallesim from inputstream.
{code:java}
CREATE CATALOG myHive with (
'type'='hive',
'property-version'='1',
'default-database' = 'flink_sql_online_test'
);
-- SET table.sql-dialect=hive;
-- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
-- `timestamp` BIGINT,
-- `time` STRING,
-- id BIGINT,
-- product STRING,
-- price DOUBLE,
-- canSell STRING,
-- selledNum BIGINT
-- ) PARTITIONED BY (
-- dt STRING,
-- `hour` STRING,
-- `min` STRING
-- ) TBLPROPERTIES (
-- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
-- 'sink.partition-commit.trigger'='partition-time',
-- 'sink.partition-commit.delay'='1 min',
-- 'sink.partition-commit.policy.kind'='metastore,success-file'
-- );
create table fs_sink (
`timestamp` BIGINT,
`time` STRING,
id BIGINT,
product STRING,
price DOUBLE,
canSell STRING,
selledNum BIGINT,
dt STRING,
`hour` STRING,
`min` STRING
) PARTITIONED BY (dt, `hour`, `min`) with (
'connector'='filesystem',
'path'='hdfs://XXXX',
'format'='csv'
);
insert into fs_sink
select * from myHive.flink_sql_online_test.hive_sink;
{code}
I think this problem can be fixed by add a parallesim for it just like
{code:java}
val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
.setParallelism(inputTransformation.getParallelism)
{code}
> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
> Key: FLINK-22437
> URL: https://issues.apache.org/jira/browse/FLINK-22437
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.2
> Reporter: zoucao
> Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be
> used, and it close the last partiton if a new record does not belong to the
> existing partition. The phenomenon occurred if there are more than one
> partiton' records are sent to filesystem sink at the same time. Hive source
> can determine parallesim by the number of file and partiton, and the
> parallesim will extended by sort operator, but in
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add
> to support `SinkNotNullEnforcer`, There is no parallesim set for it, so
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
> 'type'='hive',
> 'property-version'='1',
> 'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> -- `timestamp` BIGINT,
> -- `time` STRING,
> -- id BIGINT,
> -- product STRING,
> -- price DOUBLE,
> -- canSell STRING,
> -- selledNum BIGINT
> -- ) PARTITIONED BY (
> -- dt STRING,
> -- `hour` STRING,
> -- `min` STRING
> -- ) TBLPROPERTIES (
> -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> -- 'sink.partition-commit.trigger'='partition-time',
> -- 'sink.partition-commit.delay'='1 min',
> -- 'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
> `timestamp` BIGINT,
> `time` STRING,
> id BIGINT,
> product STRING,
> price DOUBLE,
> canSell STRING,
> selledNum BIGINT,
> dt STRING,
> `hour` STRING,
> `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
> 'connector'='filesystem',
> 'path'='hdfs://XXXX',
> 'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by adding a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
> .setParallelism(inputTransformation.getParallelism)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)