[
https://issues.apache.org/jira/browse/BEAM-12474?focusedWorklogId=620119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-620119
]
ASF GitHub Bot logged work on BEAM-12474:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jul/21 18:04
Start Date: 07/Jul/21 18:04
Worklog Time Spent: 10m
Work Description: kennknowles commented on a change in pull request
#15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665587222
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
Review comment:
Noting that this namespace is for non-user-facing classes that have no
backwards compatibility guarantees.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
}
}
+ /**
+ * A handler that holds onto the {@link Throwable} that led to the
exception, returning it along
+ * with the original value as a {@link KV}.
+ *
+ * <p>Extends {@link SimpleFunction} so that full type information is
captured. {@link KV} and
+ * {@link ComparableThrowable} coders can be easily inferred by Beam, so
coder inference can be
+ * successfully applied if the consuming transform passes type information
to the failure
+ * collection's {@link TupleTag}. This may require creating an instance of
an anonymous inherited
+ * class rather than of this class directly.
+ */
+ public static class ThrowableHandler<T>
Review comment:
I'm not sure if you know this, but you can do `new SimpleFunction<Foo,
Bar>(<lambda>) {}` and the curly braces at the end cause it to be a subclass
with type information preserved. I think having this class is fine, too. But if
it is just boilerplate to preserve types then you might be able to use the
inline anonymous subclass trick.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man
child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a
result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This
makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This
wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code
remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link
Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or
because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance
type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the
exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate
between two exceptions
+ * thrown at different times or in different processes. For this reason, this
is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to
try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link
Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
+ private Throwable throwable;
+
+ private ComparableThrowable() {
+ // Can't set this to null without adding a pointless @Nullable annotation
to the field. It also
+ // needs to be set from the constructor to avoid a checkstyle violation.
+ this.throwable = new Throwable();
+ }
+
+ /** Wraps {@code throwable} and returns the result. */
+ public static ComparableThrowable forThrowable(Throwable throwable) {
+ ComparableThrowable comparable = new ComparableThrowable();
+ comparable.throwable = throwable;
+ return comparable;
+ }
+
+ /** Returns the underlying {@link Throwable}. */
+ public Throwable throwable() {
+ return throwable;
+ }
+
+ @Override
+ public int hashCode() {
+ return throwable.hashCode();
+ }
+
+ @Override
+ public boolean equals(@Nullable Object obj) {
+ if (!(obj instanceof ComparableThrowable)) {
+ return false;
+ }
+ Throwable other = ((ComparableThrowable) obj).throwable;
+
+ boolean currentLevelEqual =
+ throwable.getClass().isInstance(other)
Review comment:
I think you want to check if their classes are equal. This will be fine
for mutation detection, etc, which is not really using equality except to check
that nothing broke.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man
child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a
result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This
makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This
wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code
remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link
Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or
because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance
type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the
exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate
between two exceptions
+ * thrown at different times or in different processes. For this reason, this
is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to
try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link
Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
Review comment:
Yea I was going to comment on this. It is OK to be clunky. Perhaps
`ThrowableWithEquals`. The problem of course is that inheriting concrete
implementations of `equals` carries risk. Subclasses will most likely not
implement equals so if they carry additional information they will have an
incorrect implementation. (this is mitigated when they have an interface like
`List` that specifies what equality means, and equality is defined to be in
terms of the interface)
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man
child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a
result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This
makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This
wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code
remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link
Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or
because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance
type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the
exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate
between two exceptions
+ * thrown at different times or in different processes. For this reason, this
is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to
try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link
Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
Review comment:
Having read the rest of the PR, I think the reason for this is to make
it a friendly Beam element, so `SerializableThrowable` might be OK.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
getIdAttribute(),
getNeedsAttributes(),
getNeedsMessageId());
- PCollection<T> read =
- input.apply(source).apply(MapElements.into(new TypeDescriptor<T>()
{}).via(getParseFn()));
+
+ PCollection<T> read;
+ PCollection<PubsubMessage> preParse = input.apply(source);
+ TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+ if (getDeadLetterTopicProvider() == null) {
+ read =
preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+ } else {
+ Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+ preParse.apply(
+ "PubsubIO.Read/Map/Parse-Incoming-Messages",
+ MapElements.into(typeDescriptor)
+ .via(getParseFn())
+ .exceptionsVia(new
WithFailures.ThrowableHandler<PubsubMessage>() {}));
+ read = result.output();
+
+ // Write out failures to the provided dead-letter topic.
+ result
+ .failures()
Review comment:
OK so seeing this, I _think_ there is no update problem because the
throwable is never actually serialized.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
}
}
+ /**
+ * A handler that holds onto the {@link Throwable} that led to the
exception, returning it along
+ * with the original value as a {@link KV}.
+ *
+ * <p>Extends {@link SimpleFunction} so that full type information is
captured. {@link KV} and
+ * {@link ComparableThrowable} coders can be easily inferred by Beam, so
coder inference can be
+ * successfully applied if the consuming transform passes type information
to the failure
+ * collection's {@link TupleTag}. This may require creating an instance of
an anonymous inherited
+ * class rather than of this class directly.
+ */
+ public static class ThrowableHandler<T>
+ extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+ @Override
+ public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+ return KV.of(f.element(),
ComparableThrowable.forThrowable(f.exception()));
Review comment:
Unless I missed something, this output will be inferred to use
`KvCoder.of(<T coder>. SerializableCoder)`. So this may create problems for
pipeline update.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man
child classes like
Review comment:
`man child classes`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 620119)
Time Spent: 3h 40m (was: 3.5h)
> Configure PubsubIO with dead-letter topic
> -----------------------------------------
>
> Key: BEAM-12474
> URL: https://issues.apache.org/jira/browse/BEAM-12474
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Zachary Houfek
> Assignee: Zachary Houfek
> Priority: P2
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
> Based on discussion from [PR14971|https://github.com/apache/beam/pull/14971].
> Currently, PubsubIO throws a RuntimeException when it can't parse a payload.
> It would perhaps be helpful if users could configure PubsubIO.Read with a
> dead-letter topic so that parsing errors instead write to the topic rather
> than just throw an exception.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)