Filed https://issues.apache.org/jira/browse/BEAM-2482 for updating
CodedValueMutationDetector

On Tue, Jun 20, 2017 at 8:29 AM, Kenneth Knowles <[email protected]>
wrote:

> Replacing full encoding with structural value is a good way to provide an
> opportunity for a fast past. File a starter JIRA?
>
> The equals check should be retained since it will sometimes be even faster,
> and structural value falls back to full encoding.
>
> On Tue, Jun 20, 2017 at 8:19 AM, Lukasz Cwik <[email protected]>
> wrote:
>
> > I think the mutation detector could be updated to use the coder's
> > structural value and the coder could then provide a structural value
> which
> > wraps the message and does the equality comparison however it chooses.
> > https://github.com/apache/beam/blob/01b3f87f977d44eac23eb5488074bb
> > c638858a9d/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > coders/Coder.java#L252
> >
> > On Tue, Jun 20, 2017 at 8:16 AM, Lukasz Cwik <[email protected]> wrote:
> >
> > > Either Java object equality or its coder needs to be deterministic for
> > > that check to hold.
> > >
> > > On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <[email protected]>
> > > wrote:
> > >
> > >> Him
> > >>
> > >> That is only a fast path. If equals returns false, it then encodes the
> > >> values to a byte array and checks the byte array for equality. So as
> > long
> > >> as you havev a correct coder, this should work.
> > >>
> > >> On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré <
> [email protected]>
> > >> wrote:
> > >>
> > >> > Hi Kenn,
> > >> >
> > >> > I checked in MutationDetectors, and we use the
> > >> > CodedValueMutationDetector(T value, Coder<T> coder).
> > >> >
> > >> > To verify mutation, we use the verifyUnmodified() method calling
> > >> > verifyUnmodifiedThrowingCheckedExceptions().
> > >> >
> > >> > In the verifyUnmodifiedThrowingCheckedExceptions() method,
> basically,
> > >> we
> > >> > do:
> > >> >
> > >> >       if (Objects.equals(possiblyModifiedObject,
> > clonedOriginalObject)
> > >> >           || Objects.equals(clonedOriginalObject,
> > >> > possiblyModifiedObject)) {
> > >> >         return;
> > >> >       }
> > >> >
> > >> > (populated with CoderUtils).
> > >> >
> > >> > So, in a test, I mimic the MutationDetector doing:
> > >> >
> > >> >     Message message = Message.Factory.create();
> > >> >     message.setBody(new AmqpValue("test"));
> > >> >     byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
> > >> >     Message clone = CoderUtils.decodeFromByteArray(coder, encoded);
> > >> >     assertTrue(Objects.equals(message, clone));
> > >> >     assertTrue(Objects.equals(clone, message));
> > >> >
> > >> > And it fails as Message doesn't provide a custom equals() method.
> > >> >
> > >> > So, I guess the only way is to extend Message to provide equals
> > method.
> > >> >
> > >> > I'm experimenting this now.
> > >> >
> > >> > Regards
> > >> > JB
> > >> >
> > >> > On 06/19/2017 07:57 AM, Kenneth Knowles wrote:
> > >> >
> > >> >> Last I checked, equals() was used only as a shortcut. If the two
> are
> > >> not
> > >> >> equals() then their encoded forms should be checked. If neither
> > >> equals()
> > >> >> nor the coder can work for this, you will have a bad time.
> > >> >>
> > >> >> On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré <
> > >> [email protected]>
> > >> >> wrote:
> > >> >>
> > >> >> Hi team,
> > >> >>>
> > >> >>> The direct runner checks that there's no mutation on elements in a
> > >> >>> PCollection thanks to ImmutabilityEnforcementFactory.
> > >> >>> This factory uses CodedValueMutationDetector to detect if an
> element
> > >> has
> > >> >>> been changed or not.
> > >> >>>
> > >> >>> The CodedValueMutationDetector uses equals (in the
> > >> >>> verifyUnmodifiedThrowingCheckedExceptions() method) for that.
> > >> >>>
> > >> >>> However, in an IO on which I'm working on, the element class
> doesn't
> > >> >>> override equals and it fails with:
> > >> >>>
> > >> >>> org.apache.beam.sdk.util.IllegalMutationException: PTransform
> > >> >>> AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated
> value
> > >> >>> Message{body=AmqpValue{Test 0}} of class class
> > >> >>> org.apache.qpid.proton.message.impl.MessageImpl. Input values
> must
> > >> not
> > >> >>> be
> > >> >>> mutated in any way.
> > >> >>>
> > >> >>> So, basically my question is:
> > >> >>>
> > >> >>> 1. Do I need to wrap the message in a custom wrapper overriding
> the
> > >> >>> equals() method ?
> > >> >>> 2. Maybe we could improve a bit the checker in the direct runner ?
> > >> >>>
> > >> >>> Thanks
> > >> >>> Regards
> > >> >>> JB
> > >> >>> --
> > >> >>> Jean-Baptiste Onofré
> > >> >>> [email protected]
> > >> >>> http://blog.nanthrax.net
> > >> >>> Talend - http://www.talend.com
> > >> >>>
> > >> >>>
> > >> >>
> > >> > --
> > >> > Jean-Baptiste Onofré
> > >> > [email protected]
> > >> > http://blog.nanthrax.net
> > >> > Talend - http://www.talend.com
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to