I think the mutation detector could be updated to use the coder's
structural value and the coder could then provide a structural value which
wraps the message and does the equality comparison however it chooses.
https://github.com/apache/beam/blob/01b3f87f977d44eac23eb5488074bbc638858a9d/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L252

On Tue, Jun 20, 2017 at 8:16 AM, Lukasz Cwik <[email protected]> wrote:

> Either Java object equality or its coder needs to be deterministic for
> that check to hold.
>
> On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <[email protected]>
> wrote:
>
>> Him
>>
>> That is only a fast path. If equals returns false, it then encodes the
>> values to a byte array and checks the byte array for equality. So as long
>> as you havev a correct coder, this should work.
>>
>> On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> > Hi Kenn,
>> >
>> > I checked in MutationDetectors, and we use the
>> > CodedValueMutationDetector(T value, Coder<T> coder).
>> >
>> > To verify mutation, we use the verifyUnmodified() method calling
>> > verifyUnmodifiedThrowingCheckedExceptions().
>> >
>> > In the verifyUnmodifiedThrowingCheckedExceptions() method, basically,
>> we
>> > do:
>> >
>> >       if (Objects.equals(possiblyModifiedObject, clonedOriginalObject)
>> >           || Objects.equals(clonedOriginalObject,
>> > possiblyModifiedObject)) {
>> >         return;
>> >       }
>> >
>> > (populated with CoderUtils).
>> >
>> > So, in a test, I mimic the MutationDetector doing:
>> >
>> >     Message message = Message.Factory.create();
>> >     message.setBody(new AmqpValue("test"));
>> >     byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
>> >     Message clone = CoderUtils.decodeFromByteArray(coder, encoded);
>> >     assertTrue(Objects.equals(message, clone));
>> >     assertTrue(Objects.equals(clone, message));
>> >
>> > And it fails as Message doesn't provide a custom equals() method.
>> >
>> > So, I guess the only way is to extend Message to provide equals method.
>> >
>> > I'm experimenting this now.
>> >
>> > Regards
>> > JB
>> >
>> > On 06/19/2017 07:57 AM, Kenneth Knowles wrote:
>> >
>> >> Last I checked, equals() was used only as a shortcut. If the two are
>> not
>> >> equals() then their encoded forms should be checked. If neither
>> equals()
>> >> nor the coder can work for this, you will have a bad time.
>> >>
>> >> On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré <
>> [email protected]>
>> >> wrote:
>> >>
>> >> Hi team,
>> >>>
>> >>> The direct runner checks that there's no mutation on elements in a
>> >>> PCollection thanks to ImmutabilityEnforcementFactory.
>> >>> This factory uses CodedValueMutationDetector to detect if an element
>> has
>> >>> been changed or not.
>> >>>
>> >>> The CodedValueMutationDetector uses equals (in the
>> >>> verifyUnmodifiedThrowingCheckedExceptions() method) for that.
>> >>>
>> >>> However, in an IO on which I'm working on, the element class doesn't
>> >>> override equals and it fails with:
>> >>>
>> >>> org.apache.beam.sdk.util.IllegalMutationException: PTransform
>> >>> AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated value
>> >>> Message{body=AmqpValue{Test 0}} of class class
>> >>> org.apache.qpid.proton.message.impl.MessageImpl. Input values must
>> not
>> >>> be
>> >>> mutated in any way.
>> >>>
>> >>> So, basically my question is:
>> >>>
>> >>> 1. Do I need to wrap the message in a custom wrapper overriding the
>> >>> equals() method ?
>> >>> 2. Maybe we could improve a bit the checker in the direct runner ?
>> >>>
>> >>> Thanks
>> >>> Regards
>> >>> JB
>> >>> --
>> >>> Jean-Baptiste Onofré
>> >>> [email protected]
>> >>> http://blog.nanthrax.net
>> >>> Talend - http://www.talend.com
>> >>>
>> >>>
>> >>
>> > --
>> > Jean-Baptiste Onofré
>> > [email protected]
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>
>
>

Reply via email to