johnjcasey commented on code in PR #29616:
URL: https://github.com/apache/beam/pull/29616#discussion_r1416242025
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -241,6 +244,34 @@ public static <T> TimestampedValues<T> timestamped(
return
timestamped(ImmutableList.<TimestampedValue<T>>builder().add(elem).add(elems).build());
}
+ /**
+ * Returns a new {@link Create.TimestampedValues} transform that produces a
{@link PCollection}
Review Comment:
looks like some typos in the docstring
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T>
element, OutputReceiver<
}
}
+ /**
+ * A {@code PTransform} that creates a {@code PCollection} whose elements
have associated
+ * timestamps.
+ */
+ public static class WindowedValues<T> extends PTransform<PBegin,
PCollection<T>> {
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withCoder(Coder<T> coder) {
+ return new WindowedValues<>(windowedValues, Optional.of(coder),
windowCoder, typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withWindowCoder(Coder<? extends BoundedWindow>
windowCoder) {
+ return new WindowedValues<>(
+ windowedValues, elementCoder, Optional.of(windowCoder),
typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.TimestampedValues} PTransform like this one
that uses the given
+ * {@code Schema} to represent objects.
+ */
+ public WindowedValues<T> withSchema(
+ Schema schema,
+ TypeDescriptor<T> typeDescriptor,
+ SerializableFunction<T, Row> toRowFunction,
+ SerializableFunction<Row, T> fromRowFunction) {
+ return withCoder(SchemaCoder.of(schema, typeDescriptor, toRowFunction,
fromRowFunction));
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * TypeDescriptor<T>} to determine the {@code Coder} to use to decode each
of the objects into a
+ * value of type {@code T}. Note that a default coder must be registered
for the class described
+ * in the {@code TypeDescriptor<T>}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.WindowedValues} with no elements, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withType(TypeDescriptor<T> type) {
+ return new WindowedValues<>(windowedValues, elementCoder, windowCoder,
Optional.of(type));
+ }
+
+ @Override
+ public PCollection<T> expand(PBegin input) {
+ try {
+ Coder<T> coder = null;
+ CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
+ SchemaRegistry schemaRegistry =
input.getPipeline().getSchemaRegistry();
+ if (elementCoder.isPresent()) {
+ coder = elementCoder.get();
+ } else if (typeDescriptor.isPresent()) {
+ try {
+ coder =
+ SchemaCoder.of(
+ schemaRegistry.getSchema(typeDescriptor.get()),
+ typeDescriptor.get(),
+ schemaRegistry.getToRowFunction(typeDescriptor.get()),
+ schemaRegistry.getFromRowFunction(typeDescriptor.get()));
+ } catch (NoSuchSchemaException e) {
+ // No schema registered.
+ }
+ if (coder == null) {
+ coder = coderRegistry.getCoder(typeDescriptor.get());
+ }
+ } else {
+ Iterable<T> rawElements = Iterables.transform(windowedValues,
WindowedValue::getValue);
+ coder = getDefaultCreateCoder(coderRegistry, schemaRegistry,
rawElements);
+ }
Review Comment:
Unrelated to this pr, but this coder lookup pattern might be useful to
extract to a utility
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -2216,6 +2216,16 @@ public void outputWithTimestamp(OutputT output, Instant
timestamp) {
WindowedValue.of(output, timestamp, currentWindow,
currentElement.getPane()));
}
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo) {
+ // TODO: Check that timestamp is valid once all runners can provide
proper timestamps.
Review Comment:
Is validity jus that the timestamp is in the window?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T>
element, OutputReceiver<
}
}
+ /**
+ * A {@code PTransform} that creates a {@code PCollection} whose elements
have associated
+ * timestamps.
+ */
+ public static class WindowedValues<T> extends PTransform<PBegin,
PCollection<T>> {
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
+ * to use if all elements have the same non-parameterized run-time class,
and a default coder is
+ * registered for that class. See {@link CoderRegistry} for details on how
defaults are
+ * determined.
+ *
+ * <p>Note that for {@link Create.TimestampedValues with no elements}, the
{@link VoidCoder} is
+ * used.
+ */
+ public WindowedValues<T> withCoder(Coder<T> coder) {
+ return new WindowedValues<>(windowedValues, Optional.of(coder),
windowCoder, typeDescriptor);
+ }
+
+ /**
+ * Returns a {@link Create.WindowedValues} PTransform like this one that
uses the given {@code
+ * Coder<T>} to decode each of the objects into a value of type {@code T}.
+ *
+ * <p>By default, {@code Create.TimestampedValues} can automatically
determine the {@code Coder}
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]