[
https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dong Lin updated FLINK-31753:
-----------------------------
Description:
DataSet has been deprecated and will be removed from Flink. However, DataStream
CoCroup is still considerably slower than DataSet when co-grouping two bounded
streams.
Here are the benchmark results of co-grouping two bounded streams with 4*10^6
records from each stream under different modes. The co-group function is chosen
to be very lightweight so that benchmark is dominated by the Flink's co-group
overhead.
DataSet: 5.6 sec
DataStream batch mode: 15.4 sec
DataStream stream mode with rocksdb: 81 sec
We should be able to performance co-group operation in DataStream stream mode
so that users' don't have to take big regression in order to migrate from
DataSet to DataStream.
We will first add util function in Flink ML to unblock the migration of some
algorithms from Alink to Flink ML.
Here is the code used to benchmark DataSet's CoGroup.
{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
env.getConfig().disableGenericTypes();
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
DataSet<Tuple3<Integer, Integer, Double>> data1 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
DataSet<Tuple3<Integer, Integer, Double>> data2 =
env.fromCollection(
new DataGenerator(numRecords),
Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
data1.coGroup(data2)
.where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple
-> tuple.f0)
.equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple
-> tuple.f0)
.with(
new RichCoGroupFunction<
Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>,
Integer>() {
@Override
public void open(Configuration parameters) throws Exception
{
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public void coGroup(
Iterable<Tuple3<Integer, Integer, Double>> iterable,
Iterable<Tuple3<Integer, Integer, Double>>
iterable1,
Collector<Integer> collector)
throws Exception {
collector.collect(1);
}
})
.write(new CountingAndDiscardingSink(), "/tmp");
{code}
was:
DataSet has been deprecated and will be removed from Flink. However, DataStream
CoCroup is still considerably slower than DataSet when co-grouping two bounded
streams.
Here is the benchmark result of co-grouping two bounded stream with 4*10^6
records from each stream. The co-group function is chosen to be very
lightweight so that we only
> Support DataStream CoGroup in stream Mode with similar performance as DataSet
> CoGroup
> -------------------------------------------------------------------------------------
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However,
> DataStream CoCroup is still considerably slower than DataSet when co-grouping
> two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6
> records from each stream under different modes. The co-group function is
> chosen to be very lightweight so that benchmark is dominated by the Flink's
> co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode
> so that users' don't have to take big regression in order to migrate from
> DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some
> algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet<Tuple3<Integer, Integer, Double>> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet<Tuple3<Integer, Integer, Double>> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
> .where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple
> -> tuple.f0)
> .equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>)
> tuple -> tuple.f0)
> .with(
> new RichCoGroupFunction<
> Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>,
> Integer>() {
> @Override
> public void open(Configuration parameters) throws
> Exception {
> super.open(parameters);
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> @Override
> public void coGroup(
> Iterable<Tuple3<Integer, Integer, Double>>
> iterable,
> Iterable<Tuple3<Integer, Integer, Double>>
> iterable1,
> Collector<Integer> collector)
> throws Exception {
> collector.collect(1);
> }
> })
> .write(new CountingAndDiscardingSink(), "/tmp");
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)