kennknowles commented on code in PR #29616:
URL: https://github.com/apache/beam/pull/29616#discussion_r1417643113
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -241,6 +244,34 @@ public static <T> TimestampedValues<T> timestamped(
return
timestamped(ImmutableList.<TimestampedValue<T>>builder().add(elem).add(elems).build());
}
+ /**
+ * Returns a new {@link Create.TimestampedValues} transform that produces a
{@link PCollection}
Review Comment:
Ah yes. Copy/paste/forget of course :-)
Thanks!
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T>
element, OutputReceiver<
}
}
+ /**
+ * A {@code PTransform} that creates a {@code PCollection} whose elements
have associated
+ * timestamps.
+ */
+ public static class WindowedValues<T> extends PTransform<PBegin,
PCollection<T>> {
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withCoder(Coder<T> coder) {
+ return new WindowedValues<>(windowedValues, Optional.of(coder),
windowCoder, typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
Review Comment:
Done
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -2216,6 +2216,16 @@ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
WindowedValue.of(output, timestamp, currentWindow,
currentElement.getPane()));
}
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo) {
+ // TODO: Check that timestamp is valid once all runners can provide
proper timestamps.
Review Comment:
TBH I just copy/pasted the comment. Filed
https://github.com/apache/beam/issues/29637 and linked it to each of the TODOs.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T>
element, OutputReceiver<
}
}
+ /**
+ * A {@code PTransform} that creates a {@code PCollection} whose elements
have associated
+ * timestamps.
+ */
+ public static class WindowedValues<T> extends PTransform<PBegin,
PCollection<T>> {
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withCoder(Coder<T> coder) {
+ return new WindowedValues<>(windowedValues, Optional.of(coder),
windowCoder, typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withWindowCoder(Coder<? extends BoundedWindow>
windowCoder) {
+ return new WindowedValues<>(
+ windowedValues, elementCoder, Optional.of(windowCoder),
typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.TimestampedValues} PTransform like this one
that uses the given
+ * {@code Schema} to represent objects.
+ */
+ public WindowedValues<T> withSchema(
+ Schema schema,
+ TypeDescriptor<T> typeDescriptor,
+ SerializableFunction<T, Row> toRowFunction,
+ SerializableFunction<Row, T> fromRowFunction) {
+ return withCoder(SchemaCoder.of(schema, typeDescriptor, toRowFunction,
fromRowFunction));
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * TypeDescriptor<T>} to determine the {@code Coder} to use to decode each
of the objects into a
+ * value of type {@code T}. Note that a default coder must be registered
for the class described
+ * in the {@code TypeDescriptor<T>}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.WindowedValues} with no elements, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withType(TypeDescriptor<T> type) {
+ return new WindowedValues<>(windowedValues, elementCoder, windowCoder,
Optional.of(type));
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ try {
+ Coder<T> coder = null;
+ CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
+ SchemaRegistry schemaRegistry =
input.getPipeline().getSchemaRegistry();
+ if (elementCoder.isPresent()) {
+ coder = elementCoder.get();
+ } else if (typeDescriptor.isPresent()) {
+ try {
+ coder =
+ SchemaCoder.of(
+ schemaRegistry.getSchema(typeDescriptor.get()),
+ typeDescriptor.get(),
+ schemaRegistry.getToRowFunction(typeDescriptor.get()),
+ schemaRegistry.getFromRowFunction(typeDescriptor.get()));
+ } catch (NoSuchSchemaException e) {
+ // No schema registered.
+ }
+ if (coder == null) {
+ coder = coderRegistry.getCoder(typeDescriptor.get());
+ }
+ } else {
+ Iterable<T> rawElements = Iterables.transform(windowedValues,
WindowedValue::getValue);
+ coder = getDefaultCreateCoder(coderRegistry, schemaRegistry,
rawElements);
+ }
Review Comment:
Yea, I need to see about this. I am actually the original author of the
"infer coder from raw values" logic back in the day. It has been improved and
also had the schema stuff added so I don't 100% know how best to generalize.
Also I noticed in building this that window types are not in the registry,
because WindowFn already tells the pipeline what coder to use. Probably not a
big deal since it never mattered until this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]