Thanks guys for the details and update.
I'm updating my WriteFn and Coder accordingly.
Regards
JB
On 06/20/2017 05:52 PM, Lukasz Cwik wrote:
Filed https://issues.apache.org/jira/browse/BEAM-2482 for updating
CodedValueMutationDetector
On Tue, Jun 20, 2017 at 8:29 AM, Kenneth Knowles <[email protected]>
wrote:
Replacing full encoding with structural value is a good way to provide an
opportunity for a fast past. File a starter JIRA?
The equals check should be retained since it will sometimes be even faster,
and structural value falls back to full encoding.
On Tue, Jun 20, 2017 at 8:19 AM, Lukasz Cwik <[email protected]>
wrote:
I think the mutation detector could be updated to use the coder's
structural value and the coder could then provide a structural value
which
wraps the message and does the equality comparison however it chooses.
https://github.com/apache/beam/blob/01b3f87f977d44eac23eb5488074bb
c638858a9d/sdks/java/core/src/main/java/org/apache/beam/sdk/
coders/Coder.java#L252
On Tue, Jun 20, 2017 at 8:16 AM, Lukasz Cwik <[email protected]> wrote:
Either Java object equality or its coder needs to be deterministic for
that check to hold.
On Tue, Jun 20, 2017 at 7:49 AM, Reuven Lax <[email protected]>
wrote:
Him
That is only a fast path. If equals returns false, it then encodes the
values to a byte array and checks the byte array for equality. So as
long
as you havev a correct coder, this should work.
On Tue, Jun 20, 2017 at 2:06 AM, Jean-Baptiste Onofré <
[email protected]>
wrote:
Hi Kenn,
I checked in MutationDetectors, and we use the
CodedValueMutationDetector(T value, Coder<T> coder).
To verify mutation, we use the verifyUnmodified() method calling
verifyUnmodifiedThrowingCheckedExceptions().
In the verifyUnmodifiedThrowingCheckedExceptions() method,
basically,
we
do:
if (Objects.equals(possiblyModifiedObject,
clonedOriginalObject)
|| Objects.equals(clonedOriginalObject,
possiblyModifiedObject)) {
return;
}
(populated with CoderUtils).
So, in a test, I mimic the MutationDetector doing:
Message message = Message.Factory.create();
message.setBody(new AmqpValue("test"));
byte[] encoded = CoderUtils.encodeToByteArray(coder, message);
Message clone = CoderUtils.decodeFromByteArray(coder, encoded);
assertTrue(Objects.equals(message, clone));
assertTrue(Objects.equals(clone, message));
And it fails as Message doesn't provide a custom equals() method.
So, I guess the only way is to extend Message to provide equals
method.
I'm experimenting this now.
Regards
JB
On 06/19/2017 07:57 AM, Kenneth Knowles wrote:
Last I checked, equals() was used only as a shortcut. If the two
are
not
equals() then their encoded forms should be checked. If neither
equals()
nor the coder can work for this, you will have a bad time.
On Sun, Jun 18, 2017 at 10:14 PM, Jean-Baptiste Onofré <
[email protected]>
wrote:
Hi team,
The direct runner checks that there's no mutation on elements in a
PCollection thanks to ImmutabilityEnforcementFactory.
This factory uses CodedValueMutationDetector to detect if an
element
has
been changed or not.
The CodedValueMutationDetector uses equals (in the
verifyUnmodifiedThrowingCheckedExceptions() method) for that.
However, in an IO on which I'm working on, the element class
doesn't
override equals and it fails with:
org.apache.beam.sdk.util.IllegalMutationException: PTransform
AmqpIO.Write/ParDo(Write)/ParMultiDo(Write) illegaly mutated
value
Message{body=AmqpValue{Test 0}} of class class
org.apache.qpid.proton.message.impl.MessageImpl. Input values
must
not
be
mutated in any way.
So, basically my question is:
1. Do I need to wrap the message in a custom wrapper overriding
the
equals() method ?
2. Maybe we could improve a bit the checker in the direct runner ?
Thanks
Regards
JB
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com