Filed https://issues.apache.org/jira/browse/BEAM-2482 for updating CodedValueMutationDetector
On Tue, Jun 20, 2017 at 8:29 AM, Kenneth Knowles <[email protected]> wrote: > Replacing full encoding with structural value is a good way to provide an > opportunity for a fast past. File a starter JIRA? > > The equals check should be retained since it will sometimes be even faster, > and structural value falls back to full encoding. > > On Tue, Jun 20, 2017 at 8:19 AM, Lukasz Cwik <[email protected]> > wrote: > > > I think the mutation detector could be updated to use the coder's > > structural value and the coder could then provide a structural value > which > > wraps the message and does the equality comparison however it chooses. > > https://github.com/apache/beam/blob/01b3f87f977d44eac23eb5488074bb > > c638858a9d/sdks/java/core/src/main/java/org/apache/beam/sdk/ > > coders/Coder.java#L252 > > > > On Tue, Jun 20, 2017 at 8:16 AM, Lukasz Cwik <[email protected]> wrote: > > > > > Either Java object equality or its coder needs to be deterministic for > > > that check to hold. > > > > > > On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <[email protected]> > > > wrote: > > > > > >> Him > > >> > > >> That is only a fast path. If equals returns false, it then encodes the > > >> values to a byte array and checks the byte array for equality. So as > > long > > >> as you havev a correct coder, this should work. > > >> > > >> On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré < > [email protected]> > > >> wrote: > > >> > > >> > Hi Kenn, > > >> > > > >> > I checked in MutationDetectors, and we use the > > >> > CodedValueMutationDetector(T value, Coder<T> coder). > > >> > > > >> > To verify mutation, we use the verifyUnmodified() method calling > > >> > verifyUnmodifiedThrowingCheckedExceptions(). > > >> > > > >> > In the verifyUnmodifiedThrowingCheckedExceptions() method, > basically, > > >> we > > >> > do: > > >> > > > >> > if (Objects.equals(possiblyModifiedObject, > > clonedOriginalObject) > > >> > || Objects.equals(clonedOriginalObject, > > >> > possiblyModifiedObject)) { > > >> > return; > > >> > } > > >> > > > >> > (populated with CoderUtils). > > >> > > > >> > So, in a test, I mimic the MutationDetector doing: > > >> > > > >> > Message message = Message.Factory.create(); > > >> > message.setBody(new AmqpValue("test")); > > >> > byte[] encoded = CoderUtils.encodeToByteArray(coder, message); > > >> > Message clone = CoderUtils.decodeFromByteArray(coder, encoded); > > >> > assertTrue(Objects.equals(message, clone)); > > >> > assertTrue(Objects.equals(clone, message)); > > >> > > > >> > And it fails as Message doesn't provide a custom equals() method. > > >> > > > >> > So, I guess the only way is to extend Message to provide equals > > method. > > >> > > > >> > I'm experimenting this now. > > >> > > > >> > Regards > > >> > JB > > >> > > > >> > On 06/19/2017 07:57 AM, Kenneth Knowles wrote: > > >> > > > >> >> Last I checked, equals() was used only as a shortcut. If the two > are > > >> not > > >> >> equals() then their encoded forms should be checked. If neither > > >> equals() > > >> >> nor the coder can work for this, you will have a bad time. > > >> >> > > >> >> On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré < > > >> [email protected]> > > >> >> wrote: > > >> >> > > >> >> Hi team, > > >> >>> > > >> >>> The direct runner checks that there's no mutation on elements in a > > >> >>> PCollection thanks to ImmutabilityEnforcementFactory. > > >> >>> This factory uses CodedValueMutationDetector to detect if an > element > > >> has > > >> >>> been changed or not. > > >> >>> > > >> >>> The CodedValueMutationDetector uses equals (in the > > >> >>> verifyUnmodifiedThrowingCheckedExceptions() method) for that. > > >> >>> > > >> >>> However, in an IO on which I'm working on, the element class > doesn't > > >> >>> override equals and it fails with: > > >> >>> > > >> >>> org.apache.beam.sdk.util.IllegalMutationException: PTransform > > >> >>> AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated > value > > >> >>> Message{body=AmqpValue{Test 0}} of class class > > >> >>> org.apache.qpid.proton.message.impl.MessageImpl. Input values > must > > >> not > > >> >>> be > > >> >>> mutated in any way. > > >> >>> > > >> >>> So, basically my question is: > > >> >>> > > >> >>> 1. Do I need to wrap the message in a custom wrapper overriding > the > > >> >>> equals() method ? > > >> >>> 2. Maybe we could improve a bit the checker in the direct runner ? > > >> >>> > > >> >>> Thanks > > >> >>> Regards > > >> >>> JB > > >> >>> -- > > >> >>> Jean-Baptiste Onofré > > >> >>> [email protected] > > >> >>> http://blog.nanthrax.net > > >> >>> Talend - http://www.talend.com > > >> >>> > > >> >>> > > >> >> > > >> > -- > > >> > Jean-Baptiste Onofré > > >> > [email protected] > > >> > http://blog.nanthrax.net > > >> > Talend - http://www.talend.com > > >> > > > >> > > > > > > > > >
