Either Java object equality or its coder needs to be deterministic for that check to hold.
On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <re...@google.com.invalid> wrote: > Him > > That is only a fast path. If equals returns false, it then encodes the > values to a byte array and checks the byte array for equality. So as long > as you havev a correct coder, this should work. > > On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > Hi Kenn, > > > > I checked in MutationDetectors, and we use the > > CodedValueMutationDetector(T value, Coder<T> coder). > > > > To verify mutation, we use the verifyUnmodified() method calling > > verifyUnmodifiedThrowingCheckedExceptions(). > > > > In the verifyUnmodifiedThrowingCheckedExceptions() method, basically, we > > do: > > > > if (Objects.equals(possiblyModifiedObject, clonedOriginalObject) > > || Objects.equals(clonedOriginalObject, > > possiblyModifiedObject)) { > > return; > > } > > > > (populated with CoderUtils). > > > > So, in a test, I mimic the MutationDetector doing: > > > > Message message = Message.Factory.create(); > > message.setBody(new AmqpValue("test")); > > byte[] encoded = CoderUtils.encodeToByteArray(coder, message); > > Message clone = CoderUtils.decodeFromByteArray(coder, encoded); > > assertTrue(Objects.equals(message, clone)); > > assertTrue(Objects.equals(clone, message)); > > > > And it fails as Message doesn't provide a custom equals() method. > > > > So, I guess the only way is to extend Message to provide equals method. > > > > I'm experimenting this now. > > > > Regards > > JB > > > > On 06/19/2017 07:57 AM, Kenneth Knowles wrote: > > > >> Last I checked, equals() was used only as a shortcut. If the two are not > >> equals() then their encoded forms should be checked. If neither equals() > >> nor the coder can work for this, you will have a bad time. > >> > >> On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré <j...@nanthrax.net > > > >> wrote: > >> > >> Hi team, > >>> > >>> The direct runner checks that there's no mutation on elements in a > >>> PCollection thanks to ImmutabilityEnforcementFactory. > >>> This factory uses CodedValueMutationDetector to detect if an element > has > >>> been changed or not. > >>> > >>> The CodedValueMutationDetector uses equals (in the > >>> verifyUnmodifiedThrowingCheckedExceptions() method) for that. > >>> > >>> However, in an IO on which I'm working on, the element class doesn't > >>> override equals and it fails with: > >>> > >>> org.apache.beam.sdk.util.IllegalMutationException: PTransform > >>> AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated value > >>> Message{body=AmqpValue{Test 0}} of class class > >>> org.apache.qpid.proton.message.impl.MessageImpl. Input values must not > >>> be > >>> mutated in any way. > >>> > >>> So, basically my question is: > >>> > >>> 1. Do I need to wrap the message in a custom wrapper overriding the > >>> equals() method ? > >>> 2. Maybe we could improve a bit the checker in the direct runner ? > >>> > >>> Thanks > >>> Regards > >>> JB > >>> -- > >>> Jean-Baptiste Onofré > >>> jbono...@apache.org > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >>> > >> > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >