[
https://issues.apache.org/jira/browse/BEAM-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeff Klukas updated BEAM-5413:
------------------------------
Description:
Defining a composite transform today requires writing a full named subclass of
PTransform (as [the programming guide
documents|[https://beam.apache.org/documentation/programming-guide/#composite-transforms]]
but there are cases where users may want to define a fairly trivial composite
transform using a less verbose Java 8 lambda expression.
Consider an example where the user has defined MyDeserializationTransform that
attempts to deserialize byte arrays into some object, returning a
PCollectionTuple with tags for successfully deserialized records (mainTag) and
for errors (errorTag).
If we introduce a PTransform::compose method that takes in a
SerializableFunction, the user can handle errors in a small lambda expression:
{code:java}
byteArrays
.apply("attempt to deserialize messages",
new MyDeserializationTransform())
.apply("write deserialization errors",
PTransform.compose((PCollectionTuple input) -> {
input
.get(errorTag)
.apply(new MyErrorOutputTransform());
return input.get(mainTag);
})
.apply("more processing on the deserialized messages",
new MyOtherTransform())
{code}
This style allows a more concise and fluent pipeline definition than is
currently possible.
was:
Defining a composite transform today requires writing a full named subclass of
PTransform (as [the programming guide
documents|[https://beam.apache.org/documentation/programming-guide/#composite-transforms]),]
but there are cases where users may want to define a fairly trivial composite
transform using a less verbose Java 8 lambda expression.
Consider an example where the user has defined MyDeserializationTransform that
attempts to deserialize byte arrays into some object, returning a
PCollectionTuple with tags for successfully deserialized records (mainTag) and
for errors (errorTag).
If we introduce a PTransform::compose method that takes in a
SerializableFunction, the user can handle errors in a small lambda expression:
{code:java}
byteArrays
.apply("attempt to deserialize messages",
new MyDeserializationTransform())
.apply("write deserialization errors",
PTransform.compose((PCollectionTuple input) -> {
input
.get(errorTag)
.apply(new MyErrorOutputTransform());
return input.get(mainTag);
})
.apply("more processing on the deserialized messages",
new MyOtherTransform())
{code}
This style allows a more concise and fluent pipeline definition than is
currently possible.
> Add method for defining composite transforms as lambda expressions
> ------------------------------------------------------------------
>
> Key: BEAM-5413
> URL: https://issues.apache.org/jira/browse/BEAM-5413
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jeff Klukas
> Assignee: Kenneth Knowles
> Priority: Minor
> Fix For: 2.8.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Defining a composite transform today requires writing a full named subclass
> of PTransform (as [the programming guide
> documents|[https://beam.apache.org/documentation/programming-guide/#composite-transforms]]
> but there are cases where users may want to define a fairly trivial
> composite transform using a less verbose Java 8 lambda expression.
> Consider an example where the user has defined MyDeserializationTransform
> that attempts to deserialize byte arrays into some object, returning a
> PCollectionTuple with tags for successfully deserialized records (mainTag)
> and for errors (errorTag).
> If we introduce a PTransform::compose method that takes in a
> SerializableFunction, the user can handle errors in a small lambda expression:
>
> {code:java}
> byteArrays
> .apply("attempt to deserialize messages",
> new MyDeserializationTransform())
> .apply("write deserialization errors",
> PTransform.compose((PCollectionTuple input) -> {
> input
> .get(errorTag)
> .apply(new MyErrorOutputTransform());
> return input.get(mainTag);
> })
> .apply("more processing on the deserialized messages",
> new MyOtherTransform())
> {code}
> This style allows a more concise and fluent pipeline definition than is
> currently possible.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)