[
https://issues.apache.org/jira/browse/FLINK-6423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek updated FLINK-6423:
------------------------------------
Description:
JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, but
not an equivalent lambda.
Consider the following code, runWithFunction completes successfully, while
runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1"
As this might look like a very minor issue, the exception is not clear and
might cause a developer to spend precious time while looking for the cause
{code}
public void runWithFunction() throws Exception {
StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("A", "B", "C");
DataStream<String> joined = stream1.join(stream2)
.where(String::toLowerCase).equalTo(String::toLowerCase)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
.apply(new JoinFunction<String, String, String>() {
@Override
public String join(String s1, String s2) {
return s1 + "_" + s2;
}
});
joined.print();
env.execute();
}
public void runWithLambda() throws Exception {
StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("A", "B", "C");
DataStream<String> joined = stream1.join(stream2)
.where(String::toLowerCase).equalTo(String::toLowerCase)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
.apply((JoinFunction<String, String, String>) (s1, s2) -> s1 +
"_" + s2);
joined.print();
env.execute();
}
{code}
was:
JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, but
not an equivalent lambda.
Consider the following code, runWithFunction completes successfully, while
runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1"
As this might look like a very minor issue, the exception is not clear and
might cause a developer to spend precious time while looking for the cause
public void runWithFunction() throws Exception {
StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("A", "B", "C");
DataStream<String> joined = stream1.join(stream2)
.where(String::toLowerCase).equalTo(String::toLowerCase)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
.apply(new JoinFunction<String, String, String>() {
@Override
public String join(String s1, String s2) {
return s1 + "_" + s2;
}
});
joined.print();
env.execute();
}
public void runWithLambda() throws Exception {
StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<String> stream1 = env.fromElements("a", "b", "c");
DataStream<String> stream2 = env.fromElements("A", "B", "C");
DataStream<String> joined = stream1.join(stream2)
.where(String::toLowerCase).equalTo(String::toLowerCase)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
.apply((JoinFunction<String, String, String>) (s1, s2) -> s1 +
"_" + s2);
joined.print();
env.execute();
}
> JoinFunction can not be replaced with lambda
> --------------------------------------------
>
> Key: FLINK-6423
> URL: https://issues.apache.org/jira/browse/FLINK-6423
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0
> Reporter: Moshe Sayag
> Priority: Minor
>
> JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter,
> but not an equivalent lambda.
> Consider the following code, runWithFunction completes successfully, while
> runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1"
> As this might look like a very minor issue, the exception is not clear and
> might cause a developer to spend precious time while looking for the cause
> {code}
> public void runWithFunction() throws Exception {
> StreamExecutionEnvironment env =
> LocalStreamEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> DataStream<String> stream1 = env.fromElements("a", "b", "c");
> DataStream<String> stream2 = env.fromElements("A", "B", "C");
> DataStream<String> joined = stream1.join(stream2)
> .where(String::toLowerCase).equalTo(String::toLowerCase)
> .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
> .apply(new JoinFunction<String, String, String>() {
> @Override
> public String join(String s1, String s2) {
> return s1 + "_" + s2;
> }
> });
> joined.print();
> env.execute();
> }
> public void runWithLambda() throws Exception {
> StreamExecutionEnvironment env =
> LocalStreamEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
> DataStream<String> stream1 = env.fromElements("a", "b", "c");
> DataStream<String> stream2 = env.fromElements("A", "B", "C");
> DataStream<String> joined = stream1.join(stream2)
> .where(String::toLowerCase).equalTo(String::toLowerCase)
> .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)))
> .apply((JoinFunction<String, String, String>) (s1, s2) -> s1
> + "_" + s2);
> joined.print();
> env.execute();
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)