Hi Dong,
> On Jan 4, 2024, at 10:18 PM, Dong Lin <[email protected]> wrote:
>
> Hi Ken,
>
> Sorry for the late reply. I didn't notice this email from you until now.
>
> In this scenario you described above, I don't think operator2 will see the
> result modified by operato1. Note that object re-use applies only to the
> transmission of data between operators in the same operator chain. But Flink
> won't put StreamX, operator1 and operator2 in the same operator chain when
> both operator1 and operator2 reads the same output from StreamX.
>
> Would this answer your question?
Actually operator2 will see the modified result.
The test case below illustrates this. It will fail when object reuse is enabled.
— Ken
package com.scaleunlimited.flinksnippets;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.CloseableIterator;
import static org.junit.Assert.*;
import org.junit.Test;
public class ObjectReuseTest {
@Test
public void testObjectReuse() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
// env.getConfig().enableObjectReuse();
DataStream<Event> stream1 = env.fromElements(
new Event("A", 1));
stream1.map((Event r) -> {
r.setValue(r.getValue() * 2);
return r;
})
.addSink(new DiscardingSink<>());
DataStream<Event> stream2 = stream1.map(r -> r);
CloseableIterator<Event> results = stream2.collectAsync();
env.execute();
assertTrue(results.hasNext());
Event result = results.next();
assertEquals(1, result.getValue());
assertFalse(results.hasNext());
}
public static class Event {
private String label;
private long value;
public Event() {}
public Event(String label, long value) {
this.label = label;
this.value = value;
}
public String getLabel() {
return label;
}
public void setLabel(String label) {
this.label = label;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
}
}
>
>
>
> On Fri, Oct 20, 2023 at 7:26 AM Ken Krugler <[email protected]
> <mailto:[email protected]>> wrote:
>> Hi Dong,
>>
>> Sorry for not seeing this initially. I did have one question about the
>> description of the issue in the FLIP:
>>
>>> However, in cases where the upstream and downstream operators do not store
>>> or access references to the input or output records, this deep-copy
>>> overhead becomes unnecessary
>>
>> I was interested in getting clarification as to what you meant by “or access
>> references…”, to see if it covered this situation:
>>
>> StreamX —forward--> operator1
>> StreamX —forward--> operator2
>>
>> If operator1 modifies the record, and object re-use is enabled, then
>> operator2 will see the modified version, right?
>>
>> Thanks,
>>
>> — Ken
>>
>>> On Jul 2, 2023, at 7:24 PM, Xuannan Su <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi all,
>>>
>>> Dong(cc'ed) and I are opening this thread to discuss our proposal to
>>> add operator attribute to allow operator to specify support for
>>> object-reuse [1].
>>>
>>> Currently, the default configuration for pipeline.object-reuse is set
>>> to false to avoid data corruption, which can result in suboptimal
>>> performance. We propose adding APIs that operators can utilize to
>>> inform the Flink runtime whether it is safe to reuse the emitted
>>> records. This enhancement would enable Flink to maximize its
>>> performance using the default configuration.
>>>
>>> Please refer to the FLIP document for more details about the proposed
>>> design and implementation. We welcome any feedback and opinions on
>>> this proposal.
>>>
>>> Best regards,
>>>
>>> Dong and Xuannan
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255073749
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink & Pinot
>>
>>
>>
--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink & Pinot