kennknowles commented on code in PR #29616:
URL: https://github.com/apache/beam/pull/29616#discussion_r1417643113


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -241,6 +244,34 @@ public static <T> TimestampedValues<T> timestamped(
     return 
timestamped(ImmutableList.<TimestampedValue<T>>builder().add(elem).add(elems).build());
   }
 
+  /**
+   * Returns a new {@link Create.TimestampedValues} transform that produces a 
{@link PCollection}

Review Comment:
   Ah yes. Copy/paste/forget of course :-)
   
   Thanks!



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T> 
element, OutputReceiver<
     }
   }
 
+  /**
+   * A {@code PTransform} that creates a {@code PCollection} whose elements 
have associated
+   * timestamps.
+   */
+  public static class WindowedValues<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    /**
+     * Returns a {@link Create.WindowedValues} PTransform like this one that 
uses the given {@code
+     * Coder<T>} to decode each of the objects into a value of type {@code T}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically 
determine the {@code Coder}
+     * to use if all elements have the same non-parameterized run-time class, 
and a default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how 
defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.TimestampedValues with no elements}, the 
{@link VoidCoder} is
+     * used.
+     */
+    public WindowedValues<T> withCoder(Coder<T> coder) {
+      return new WindowedValues<>(windowedValues, Optional.of(coder), 
windowCoder, typeDescriptor);
+    }
+
+    /**
+     * Returns a {@link Create.WindowedValues} PTransform like this one that 
uses the given {@code
+     * Coder<T>} to decode each of the objects into a value of type {@code T}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically 
determine the {@code Coder}

Review Comment:
   Done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -2216,6 +2216,16 @@ public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
           WindowedValue.of(output, timestamp, currentWindow, 
currentElement.getPane()));
     }
 
+    @Override
+    public void outputWindowedValue(
+        OutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo paneInfo) {
+      // TODO: Check that timestamp is valid once all runners can provide 
proper timestamps.

Review Comment:
   TBH I just copy/pasted the comment. Filed 
https://github.com/apache/beam/issues/29637 and linked it to each of the TODOs.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java:
##########
@@ -727,6 +758,163 @@ public void processElement(@Element TimestampedValue<T> 
element, OutputReceiver<
     }
   }
 
+  /**
+   * A {@code PTransform} that creates a {@code PCollection} whose elements 
have associated
+   * timestamps.
+   */
+  public static class WindowedValues<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    /**
+     * Returns a {@link Create.WindowedValues} PTransform like this one that 
uses the given {@code
+     * Coder<T>} to decode each of the objects into a value of type {@code T}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically 
determine the {@code Coder}
+     * to use if all elements have the same non-parameterized run-time class, 
and a default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how 
defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.TimestampedValues with no elements}, the 
{@link VoidCoder} is
+     * used.
+     */
+    public WindowedValues<T> withCoder(Coder<T> coder) {
+      return new WindowedValues<>(windowedValues, Optional.of(coder), 
windowCoder, typeDescriptor);
+    }
+
+    /**
+     * Returns a {@link Create.WindowedValues} PTransform like this one that 
uses the given {@code
+     * Coder<T>} to decode each of the objects into a value of type {@code T}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically 
determine the {@code Coder}
+     * to use if all elements have the same non-parameterized run-time class, 
and a default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how 
defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.TimestampedValues with no elements}, the 
{@link VoidCoder} is
+     * used.
+     */
+    public WindowedValues<T> withWindowCoder(Coder<? extends BoundedWindow> 
windowCoder) {
+      return new WindowedValues<>(
+          windowedValues, elementCoder, Optional.of(windowCoder), 
typeDescriptor);
+    }
+
+    /**
+     * Returns a {@link Create.TimestampedValues} PTransform like this one 
that uses the given
+     * {@code Schema} to represent objects.
+     */
+    public WindowedValues<T> withSchema(
+        Schema schema,
+        TypeDescriptor<T> typeDescriptor,
+        SerializableFunction<T, Row> toRowFunction,
+        SerializableFunction<Row, T> fromRowFunction) {
+      return withCoder(SchemaCoder.of(schema, typeDescriptor, toRowFunction, 
fromRowFunction));
+    }
+
+    /**
+     * Returns a {@link Create.WindowedValues} PTransform like this one that 
uses the given {@code
+     * TypeDescriptor<T>} to determine the {@code Coder} to use to decode each 
of the objects into a
+     * value of type {@code T}. Note that a default coder must be registered 
for the class described
+     * in the {@code TypeDescriptor<T>}.
+     *
+     * <p>By default, {@code Create.TimestampedValues} can automatically 
determine the {@code Coder}
+     * to use if all elements have the same non-parameterized run-time class, 
and a default coder is
+     * registered for that class. See {@link CoderRegistry} for details on how 
defaults are
+     * determined.
+     *
+     * <p>Note that for {@link Create.WindowedValues} with no elements, the 
{@link VoidCoder} is
+     * used.
+     */
+    public WindowedValues<T> withType(TypeDescriptor<T> type) {
+      return new WindowedValues<>(windowedValues, elementCoder, windowCoder, 
Optional.of(type));
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      try {
+        Coder<T> coder = null;
+        CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry();
+        SchemaRegistry schemaRegistry = 
input.getPipeline().getSchemaRegistry();
+        if (elementCoder.isPresent()) {
+          coder = elementCoder.get();
+        } else if (typeDescriptor.isPresent()) {
+          try {
+            coder =
+                SchemaCoder.of(
+                    schemaRegistry.getSchema(typeDescriptor.get()),
+                    typeDescriptor.get(),
+                    schemaRegistry.getToRowFunction(typeDescriptor.get()),
+                    schemaRegistry.getFromRowFunction(typeDescriptor.get()));
+          } catch (NoSuchSchemaException e) {
+            // No schema registered.
+          }
+          if (coder == null) {
+            coder = coderRegistry.getCoder(typeDescriptor.get());
+          }
+        } else {
+          Iterable<T> rawElements = Iterables.transform(windowedValues, 
WindowedValue::getValue);
+          coder = getDefaultCreateCoder(coderRegistry, schemaRegistry, 
rawElements);
+        }

Review Comment:
   Yea, I need to see about this. I am actually the original author of the 
"infer coder from raw values" logic back in the day. It has been improved and 
also had the schema stuff added so I don't 100% know how best to generalize.
   
   Also I noticed in building this that window types are not in the registry, 
because WindowFn already tells the pipeline what coder to use. Probably not a 
big deal since it never mattered until this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to