damccorm commented on code in PR #17699:
URL: https://github.com/apache/beam/pull/17699#discussion_r875756288
##########
sdks/typescript/src/apache_beam/coders/coders.ts:
##########
@@ -38,16 +38,34 @@ interface Class<T> {
*/
class CoderRegistry {
internal_registry = {};
- get(urn: string): Class<Coder<unknown>> {
- const constructor: Class<Coder<unknown>> = this.internal_registry[urn];
+
+ getCoder(
+ urn: string,
+ payload: Uint8Array | undefined = undefined,
+ ...components: Coder<unknown>[]
+ ) {
+ const constructor: (...args) => Coder<unknown> =
+ this.internal_registry[urn];
if (constructor === undefined) {
throw new Error("Could not find coder for URN " + urn);
}
- return constructor;
+ if (payload && payload.length > 0) {
+ return constructor(payload, ...components);
+ } else {
+ return constructor(...components);
+ }
+ }
+
+ register(urn: string, constructorOrClass: Class<Coder<any>>) {
Review Comment:
This is called `constructorOrClass`, but it has to just be the class, right?
Its actually just generally unclear to me why we need this function (vs
directly calling `registerClass`) - are you intending to do some branching here
depending on the input type?
##########
sdks/typescript/src/apache_beam/transforms/flatten.ts:
##########
@@ -22,23 +22,27 @@ import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { GeneralObjectCoder } from "../coders/js_coders";
-export class Flatten<T> extends PTransform<PCollection<T>[], PCollection<T>> {
- // static urn: string =
runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:flatten:v1";
- name = "Flatten";
-
- expandInternal(
+export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
+ function expandInternal(
inputs: PCollection<T>[],
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: Flatten.urn,
+ urn: flatten.urn,
payload: null!,
});
- // TODO: Input coder if they're all the same? UnionCoder?
- return pipeline.createPCollectionInternal<T>(new GeneralObjectCoder());
+ // TODO: UnionCoder if they're not the same?
+ const coders = new Set(
+ inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
+ );
+ const coder =
+ coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+ return pipeline.createPCollectionInternal<T>(coder);
}
+
+ return withName("flatten", expandInternal);
}
+
+flatten.urn = "beam:transform:flatten:v1";
Review Comment:
What are your thoughts on moving this (and other similar urns) into some
constant urn file in this or a future PR? I saw this referenced in an earlier
file and was confused since you don't see functions with fields assigned to
them too often. I'm not a JS expert, but I haven't seen this pattern before
(admittedly I also can't find any style guides that mention it positively or
negatively)
##########
sdks/typescript/src/apache_beam/transforms/pardo.ts:
##########
@@ -140,96 +135,77 @@ export class ParDo<InputT, OutputT, ContextT = undefined>
extends PTransform<
new GeneralObjectCoder()
);
}
+
+ return withName(`parDo(${extractName(doFn)})`, expandInternal);
}
+// TODO: (Cleanup) use
runnerApi.StandardPTransformClasss_Primitives.PAR_DO.urn.
+parDo.urn = "beam:transform:pardo:v1";
+
+export type SplitOptions = {
+ knownTags?: string[];
+ unknownTagBehavior?: "error" | "ignore" | "rename" | undefined;
+ unknownTagName?: string;
+ exclusive?: boolean;
+};
+
+/**
+ * Splits a single PCollection of objects, with keys k, into an object of
+ * PCollections, with the same keys k, where each PCollection consists of the
+ * values associated with that key. That is,
+ *
+ * PCollection<{a: T, b: U, ...}> maps to {a: PCollection<T>, b:
PCollection<U>, ...}
+ */
// TODO: (API) Consider as top-level method.
// TODO: Naming.
-// TODO: Allow default? Technically splitter can be implemented/wrapped to
produce such.
-// TODO: Can we enforce splitter's output with the typing system to lie in
targets?
-// TODO: (Optionally?) delete the switched-on field.
-// TODO: (API) Consider doing
-// [{a: aValue}, {g: bValue}, ...] => a: [aValue, ...], b: [bValue, ...]
-// instead of
-// [{key: 'a', aValueFields}, {key: 'b', bValueFields}, ...] =>
-// a: [{key: 'a', aValueFields}, ...], b: [{key: 'b', aValueFields},
...],
-// (implemented below as Split2).
-export class Split<T> extends PTransform<
- PCollection<T>,
- { [key: string]: PCollection<T> }
-> {
- private tags: string[];
- constructor(private splitter: (T) => string, ...tags: string[]) {
- super("Split(" + tags + ")");
- this.tags = tags;
- }
- expandInternal(
+export function split<T extends { [key: string]: unknown }>(
+ tags: string[],
+ options: SplitOptions = {}
+): PTransform<PCollection<T>, { [P in keyof T]: PCollection<T[P]> }> {
+ function expandInternal(
input: PCollection<T>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
- transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.SPLITTING_JS_DOFN_URN,
- payload: serializeFn({ splitter: this.splitter }),
- }),
- })
- ),
- });
-
- const this_ = this;
- return Object.fromEntries(
- this_.tags.map((tag) => [
- tag,
- pipeline.createPCollectionInternal<T>(
- pipeline.context.getPCollectionCoderId(input)
- ),
- ])
- );
- }
-}
+ if (options.exclusive === undefined) {
+ options.exclusive = true;
+ }
+ if (options.unknownTagBehavior === undefined) {
+ options.unknownTagBehavior = "error";
+ }
+ if (
+ options.unknownTagBehavior == "rename" &&
+ !tags.includes(options.unknownTagName!)
+ ) {
+ tags.push(options.unknownTagName!);
+ }
+ if (options.knownTags === undefined) {
+ options.knownTags = tags;
+ }
-// TODO: (Typescript) Is it possible to type that this takes
-// PCollection<{a: T, b: U, ...}> to {a: PCollection<T>, b: PCollection<U>,
...}
-// Seems to requires a cast inside expandInternal. But at least the cast is
contained there.
-export class Split2<T extends { [key: string]: unknown }> extends PTransform<
Review Comment:
Its not totally clear to me why we had this initially, or why we're now
removing Spit - could you help me understand what the split vs split2
distinction was for and why its good to now coallesce the behavior?
##########
sdks/typescript/src/apache_beam/transforms/flatten.ts:
##########
@@ -22,23 +22,27 @@ import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { GeneralObjectCoder } from "../coders/js_coders";
-export class Flatten<T> extends PTransform<PCollection<T>[], PCollection<T>> {
- // static urn: string =
runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:flatten:v1";
- name = "Flatten";
-
- expandInternal(
+export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
+ function expandInternal(
inputs: PCollection<T>[],
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: Flatten.urn,
+ urn: flatten.urn,
payload: null!,
});
- // TODO: Input coder if they're all the same? UnionCoder?
- return pipeline.createPCollectionInternal<T>(new GeneralObjectCoder());
+ // TODO: UnionCoder if they're not the same?
+ const coders = new Set(
+ inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
+ );
+ const coder =
+ coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+ return pipeline.createPCollectionInternal<T>(coder);
}
+
+ return withName("flatten", expandInternal);
Review Comment:
I think you're missing the `withName` import
##########
sdks/typescript/README.md:
##########
@@ -93,6 +93,12 @@ used in Python. These can be "ordinary" javascript objects
(which are passed
as is) or special DoFnParam objects which provide getters to element-specific
information (such as the current timestamp, window, or side input) at runtime.
+* Rather than introduce multiple-output complexity into the map/do operations
+themselves, producing multiple outputs is done by following with a new
+`Split` primitive is introduced that takes a
Review Comment:
```suggestion
`Split` primitive that takes a
```
##########
sdks/typescript/src/apache_beam/transforms/windowings.ts:
##########
@@ -30,106 +30,73 @@ import {
} from "../coders/standard_coders";
import { GlobalWindow, Instant, IntervalWindow } from "../values";
-export class GlobalWindows implements WindowFn<GlobalWindow> {
- assignWindows(Instant) {
- return [new GlobalWindow()];
- }
- windowCoder() {
- return new GlobalWindowCoder();
- }
- toProto() {
- return {
+export function globalWindows(): WindowFn<GlobalWindow> {
+ return {
+ assignWindows: (Instant) => [new GlobalWindow()],
+ windowCoder: () => new GlobalWindowCoder(),
+ isMerging: () => false,
+ assignsToOneWindow: () => true,
+ toProto: () => ({
urn: "beam:window_fn:global_windows:v1",
payload: new Uint8Array(),
- };
- }
- isMerging() {
- return false;
- }
- assignsToOneWindow() {
- return true;
- }
+ }),
+ };
}
-export class FixedWindows implements WindowFn<IntervalWindow> {
- size: Long;
- offset: Instant; // TODO: (Cleanup) Or should this be a long as well?
-
+export function fixedWindows(
+ sizeSeconds: number | Long,
+ offsetSeconds: Instant = Long.fromValue(0)
+): WindowFn<IntervalWindow> {
// TODO: (Cleanup) Use a time library?
- constructor(
- sizeSeconds: number | Long,
- offsetSeconds: Instant = Long.fromValue(0)
- ) {
- if (typeof sizeSeconds == "number") {
- this.size = Long.fromValue(sizeSeconds).mul(1000);
- } else {
- this.size = sizeSeconds.mul(1000);
- }
- this.offset = offsetSeconds.mul(1000);
+ if (typeof sizeSeconds == "number") {
+ sizeSeconds = Long.fromValue(sizeSeconds);
}
Review Comment:
```suggestion
```
I think you can just get rid of this since secsToMillisLong does the same
check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]