Hi Kenn,

I checked in MutationDetectors, and we use the CodedValueMutationDetector(T value, Coder<T> coder).

To verify mutation, we use the verifyUnmodified() method calling verifyUnmodifiedThrowingCheckedExceptions().

In the verifyUnmodifiedThrowingCheckedExceptions() method, basically, we do:

      if (Objects.equals(possiblyModifiedObject, clonedOriginalObject)
          || Objects.equals(clonedOriginalObject, possiblyModifiedObject)) {
        return;
      }

(populated with CoderUtils).

So, in a test, I mimic the MutationDetector doing:

    Message message = Message.Factory.create();
    message.setBody(new AmqpValue("test"));
    byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
    Message clone = CoderUtils.decodeFromByteArray(coder, encoded);
    assertTrue(Objects.equals(message, clone));
    assertTrue(Objects.equals(clone, message));

And it fails as Message doesn't provide a custom equals() method.

So, I guess the only way is to extend Message to provide equals method.

I'm experimenting this now.

Regards
JB

On 06/19/2017 07:57 AM, Kenneth Knowles wrote:
Last I checked, equals() was used only as a shortcut. If the two are not
equals() then their encoded forms should be checked. If neither equals()
nor the coder can work for this, you will have a bad time.

On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré <[email protected]>
wrote:

Hi team,

The direct runner checks that there's no mutation on elements in a
PCollection thanks to ImmutabilityEnforcementFactory.
This factory uses CodedValueMutationDetector to detect if an element has
been changed or not.

The CodedValueMutationDetector uses equals (in the
verifyUnmodifiedThrowingCheckedExceptions() method) for that.

However, in an IO on which I'm working on, the element class doesn't
override equals and it fails with:

org.apache.beam.sdk.util.IllegalMutationException: PTransform
AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated value
Message{body=AmqpValue{Test 0}} of class class
org.apache.qpid.proton.message.impl.MessageImpl. Input values must not be
mutated in any way.

So, basically my question is:

1. Do I need to wrap the message in a custom wrapper overriding the
equals() method ?
2. Maybe we could improve a bit the checker in the direct runner ?

Thanks
Regards
JB
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to