I think the mutation detector could be updated to use the coder's structural value and the coder could then provide a structural value which wraps the message and does the equality comparison however it chooses. https://github.com/apache/beam/blob/01b3f87f977d44eac23eb5488074bbc638858a9d/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L252
On Tue, Jun 20, 2017 at 8:16 AM, Lukasz Cwik <[email protected]> wrote: > Either Java object equality or its coder needs to be deterministic for > that check to hold. > > On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <[email protected]> > wrote: > >> Him >> >> That is only a fast path. If equals returns false, it then encodes the >> values to a byte array and checks the byte array for equality. So as long >> as you havev a correct coder, this should work. >> >> On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> > Hi Kenn, >> > >> > I checked in MutationDetectors, and we use the >> > CodedValueMutationDetector(T value, Coder<T> coder). >> > >> > To verify mutation, we use the verifyUnmodified() method calling >> > verifyUnmodifiedThrowingCheckedExceptions(). >> > >> > In the verifyUnmodifiedThrowingCheckedExceptions() method, basically, >> we >> > do: >> > >> > if (Objects.equals(possiblyModifiedObject, clonedOriginalObject) >> > || Objects.equals(clonedOriginalObject, >> > possiblyModifiedObject)) { >> > return; >> > } >> > >> > (populated with CoderUtils). >> > >> > So, in a test, I mimic the MutationDetector doing: >> > >> > Message message = Message.Factory.create(); >> > message.setBody(new AmqpValue("test")); >> > byte[] encoded = CoderUtils.encodeToByteArray(coder, message); >> > Message clone = CoderUtils.decodeFromByteArray(coder, encoded); >> > assertTrue(Objects.equals(message, clone)); >> > assertTrue(Objects.equals(clone, message)); >> > >> > And it fails as Message doesn't provide a custom equals() method. >> > >> > So, I guess the only way is to extend Message to provide equals method. >> > >> > I'm experimenting this now. >> > >> > Regards >> > JB >> > >> > On 06/19/2017 07:57 AM, Kenneth Knowles wrote: >> > >> >> Last I checked, equals() was used only as a shortcut. If the two are >> not >> >> equals() then their encoded forms should be checked. If neither >> equals() >> >> nor the coder can work for this, you will have a bad time. >> >> >> >> On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré < >> [email protected]> >> >> wrote: >> >> >> >> Hi team, >> >>> >> >>> The direct runner checks that there's no mutation on elements in a >> >>> PCollection thanks to ImmutabilityEnforcementFactory. >> >>> This factory uses CodedValueMutationDetector to detect if an element >> has >> >>> been changed or not. >> >>> >> >>> The CodedValueMutationDetector uses equals (in the >> >>> verifyUnmodifiedThrowingCheckedExceptions() method) for that. >> >>> >> >>> However, in an IO on which I'm working on, the element class doesn't >> >>> override equals and it fails with: >> >>> >> >>> org.apache.beam.sdk.util.IllegalMutationException: PTransform >> >>> AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated value >> >>> Message{body=AmqpValue{Test 0}} of class class >> >>> org.apache.qpid.proton.message.impl.MessageImpl. Input values must >> not >> >>> be >> >>> mutated in any way. >> >>> >> >>> So, basically my question is: >> >>> >> >>> 1. Do I need to wrap the message in a custom wrapper overriding the >> >>> equals() method ? >> >>> 2. Maybe we could improve a bit the checker in the direct runner ? >> >>> >> >>> Thanks >> >>> Regards >> >>> JB >> >>> -- >> >>> Jean-Baptiste Onofré >> >>> [email protected] >> >>> http://blog.nanthrax.net >> >>> Talend - http://www.talend.com >> >>> >> >>> >> >> >> > -- >> > Jean-Baptiste Onofré >> > [email protected] >> > http://blog.nanthrax.net >> > Talend - http://www.talend.com >> > >> > >
