robertwb commented on code in PR #17699:
URL: https://github.com/apache/beam/pull/17699#discussion_r876520113
##########
sdks/typescript/src/apache_beam/transforms/pardo.ts:
##########
@@ -140,96 +135,77 @@ export class ParDo<InputT, OutputT, ContextT = undefined>
extends PTransform<
new GeneralObjectCoder()
);
}
+
+ return withName(`parDo(${extractName(doFn)})`, expandInternal);
}
+// TODO: (Cleanup) use
runnerApi.StandardPTransformClasss_Primitives.PAR_DO.urn.
+parDo.urn = "beam:transform:pardo:v1";
+
+export type SplitOptions = {
+ knownTags?: string[];
+ unknownTagBehavior?: "error" | "ignore" | "rename" | undefined;
+ unknownTagName?: string;
+ exclusive?: boolean;
+};
+
+/**
+ * Splits a single PCollection of objects, with keys k, into an object of
+ * PCollections, with the same keys k, where each PCollection consists of the
+ * values associated with that key. That is,
+ *
+ * PCollection<{a: T, b: U, ...}> maps to {a: PCollection<T>, b:
PCollection<U>, ...}
+ */
// TODO: (API) Consider as top-level method.
// TODO: Naming.
-// TODO: Allow default? Technically splitter can be implemented/wrapped to
produce such.
-// TODO: Can we enforce splitter's output with the typing system to lie in
targets?
-// TODO: (Optionally?) delete the switched-on field.
-// TODO: (API) Consider doing
-// [{a: aValue}, {g: bValue}, ...] => a: [aValue, ...], b: [bValue, ...]
-// instead of
-// [{key: 'a', aValueFields}, {key: 'b', bValueFields}, ...] =>
-// a: [{key: 'a', aValueFields}, ...], b: [{key: 'b', aValueFields},
...],
-// (implemented below as Split2).
-export class Split<T> extends PTransform<
- PCollection<T>,
- { [key: string]: PCollection<T> }
-> {
- private tags: string[];
- constructor(private splitter: (T) => string, ...tags: string[]) {
- super("Split(" + tags + ")");
- this.tags = tags;
- }
- expandInternal(
+export function split<T extends { [key: string]: unknown }>(
+ tags: string[],
+ options: SplitOptions = {}
+): PTransform<PCollection<T>, { [P in keyof T]: PCollection<T[P]> }> {
+ function expandInternal(
input: PCollection<T>,
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
- transformProto.spec = runnerApi.FunctionSpec.create({
- urn: ParDo.urn,
- payload: runnerApi.ParDoPayload.toBinary(
- runnerApi.ParDoPayload.create({
- doFn: runnerApi.FunctionSpec.create({
- urn: urns.SPLITTING_JS_DOFN_URN,
- payload: serializeFn({ splitter: this.splitter }),
- }),
- })
- ),
- });
-
- const this_ = this;
- return Object.fromEntries(
- this_.tags.map((tag) => [
- tag,
- pipeline.createPCollectionInternal<T>(
- pipeline.context.getPCollectionCoderId(input)
- ),
- ])
- );
- }
-}
+ if (options.exclusive === undefined) {
+ options.exclusive = true;
+ }
+ if (options.unknownTagBehavior === undefined) {
+ options.unknownTagBehavior = "error";
+ }
+ if (
+ options.unknownTagBehavior == "rename" &&
+ !tags.includes(options.unknownTagName!)
+ ) {
+ tags.push(options.unknownTagName!);
+ }
+ if (options.knownTags === undefined) {
+ options.knownTags = tags;
+ }
-// TODO: (Typescript) Is it possible to type that this takes
-// PCollection<{a: T, b: U, ...}> to {a: PCollection<T>, b: PCollection<U>,
...}
-// Seems to requires a cast inside expandInternal. But at least the cast is
contained there.
-export class Split2<T extends { [key: string]: unknown }> extends PTransform<
Review Comment:
I consolidated them because having two primitives that do almost the same
thing in slightly different was undesirable. The original Split can be easily
implemented in terms of Split2 which is more flexible (and, IMHO, easier to
understand).
##########
sdks/typescript/src/apache_beam/transforms/flatten.ts:
##########
@@ -22,23 +22,27 @@ import { PCollection } from "../pvalue";
import { Pipeline } from "../internal/pipeline";
import { GeneralObjectCoder } from "../coders/js_coders";
-export class Flatten<T> extends PTransform<PCollection<T>[], PCollection<T>> {
- // static urn: string =
runnerApi.StandardPTransforms_Primitives.GROUP_BY_KEY.urn;
- // TODO: (Cleanup) use above line, not below line.
- static urn: string = "beam:transform:flatten:v1";
- name = "Flatten";
-
- expandInternal(
+export function flatten<T>(): PTransform<PCollection<T>[], PCollection<T>> {
+ function expandInternal(
inputs: PCollection<T>[],
pipeline: Pipeline,
transformProto: runnerApi.PTransform
) {
transformProto.spec = runnerApi.FunctionSpec.create({
- urn: Flatten.urn,
+ urn: flatten.urn,
payload: null!,
});
- // TODO: Input coder if they're all the same? UnionCoder?
- return pipeline.createPCollectionInternal<T>(new GeneralObjectCoder());
+ // TODO: UnionCoder if they're not the same?
+ const coders = new Set(
+ inputs.map((pc) => pipeline.context.getPCollectionCoderId(pc))
+ );
+ const coder =
+ coders.size == 1 ? [...coders][0] : new GeneralObjectCoder<T>();
+ return pipeline.createPCollectionInternal<T>(coder);
}
+
+ return withName("flatten", expandInternal);
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]