[
https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske updated FLINK-7919:
---------------------------------
Description:
A job with a delta iteration fails hard with a NPE in the solution set join, if
the solution set has no entry for the join key of the probe side.
The following program reproduces the problem:
{code}
DataSet<Tuple2<Long, Integer>> values = env.fromElements(
Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
.iterateDelta(values, 5,0);
DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
.map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws
Exception {
// modifying the key to join on a non existing solution set key
return Tuple2.of(value.f0 + 1, 1);
}
})
.join(di.getSolutionSet()).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>,
Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> join(
Tuple2<Long, Integer> first,
Tuple2<Long, Integer> second) throws Exception {
return Tuple2.of(first.f0, first.f1 + second.f1);
}
});
DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
result.print();
{code}
It doesn't matter whether the solution set is managed or not.
The problem is cause because the solution set hash table prober returns a
{{null}} value if the solution set does not contain a value for the probe side
key.
The join operator does not check if the return value is {{null}} or not but
immediately tries to create a copy using a {{TypeSerializer}}. This copy fails
with a NPE.
I propose to check for {{null}} and call the join function with {{null}} on the
solution set side. This gives OUTER JOIN semantics for join.
Since the code was previously failing with a NPE, it is safe to forward the
{{null}} into the {{JoinFunction}}.
However, users must be aware that the solution set value may be {{null}} and we
need to update the documentation (JavaDocs + website) to describe the behavior.
was:
A job with a delta iteration fails hard with a NPE in the solution set join, if
the solution set has no entry for the join key of the probe side.
The following program reproduces the problem:
{code}
DataSet<Tuple2<Long, Integer>> values = env.fromElements(
Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
.iterateDelta(values, 5,0);
DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
.map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws
Exception {
// modifying the key to join on a non existing solution set key
return Tuple2.of(value.f0 + 1, 1);
}
})
.join(di.getSolutionSet()).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>,
Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> join(
Tuple2<Long, Integer> first,
Tuple2<Long, Integer> second) throws Exception {
return Tuple2.of(first.f0, first.f1 + second.f1);
}
});
DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
result.print();
{code}
It doesn't matter whether the solution set is managed or not.
The problem is cause because the solution set hash table prober returns a
{{null}} value if the solution set does not contain a value for the probe side
key.
The join operator does not check if the return value is {{null}} or not but
immediately tries to create a copy using a {{TypeSerializer}}. This copy fails
with a NPE.
There are two solutions:
1. Check for {{null}} and do not call the join function (INNER join semantics)
2. Check for {{null}} and call the join function with {{null}} on the solution
set side (OUTER join semantics)
Either way, the chosen behavior should be documented.
> Join with Solution Set fails with NPE if Solution Set has no entry
> ------------------------------------------------------------------
>
> Key: FLINK-7919
> URL: https://issues.apache.org/jira/browse/FLINK-7919
> Project: Flink
> Issue Type: Bug
> Components: DataSet API, Local Runtime
> Affects Versions: 1.4.0, 1.3.2
> Reporter: Fabian Hueske
>
> A job with a delta iteration fails hard with a NPE in the solution set join,
> if the solution set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet<Tuple2<Long, Integer>> values = env.fromElements(
> Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
> .iterateDelta(values, 5,0);
> DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
> .map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {
> @Override
> public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws
> Exception {
> // modifying the key to join on a non existing solution set key
> return Tuple2.of(value.f0 + 1, 1);
> }
> })
> .join(di.getSolutionSet()).where(0).equalTo(0)
> .with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>,
> Tuple2<Long, Integer>>() {
> @Override
> public Tuple2<Long, Integer> join(
> Tuple2<Long, Integer> first,
> Tuple2<Long, Integer> second) throws Exception {
>
> return Tuple2.of(first.f0, first.f1 + second.f1);
> }
> });
> DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
> result.print();
> {code}
> It doesn't matter whether the solution set is managed or not.
> The problem is cause because the solution set hash table prober returns a
> {{null}} value if the solution set does not contain a value for the probe
> side key.
> The join operator does not check if the return value is {{null}} or not but
> immediately tries to create a copy using a {{TypeSerializer}}. This copy
> fails with a NPE.
> I propose to check for {{null}} and call the join function with {{null}} on
> the solution set side. This gives OUTER JOIN semantics for join.
> Since the code was previously failing with a NPE, it is safe to forward the
> {{null}} into the {{JoinFunction}}.
> However, users must be aware that the solution set value may be {{null}} and
> we need to update the documentation (JavaDocs + website) to describe the
> behavior.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)