[
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=762402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762402
]
ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 15:43
Start Date: 26/Apr/22 15:43
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858810856
##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc, i) => acc + 1,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc: number, i: number) => acc + i,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a > b ? a : b)),
Review Comment:
Do we need to do an undefined check here? I think either a or b could be
undefined
##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc, i) => acc + 1,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc: number, i: number) => acc + i,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a > b ? a : b)),
+ extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a < b ? a : b)),
Review Comment:
Same question - do we need to handle undefined?
##########
sdks/typescript/src/apache_beam/transforms/group_and_combine.ts:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { KV } from "../values";
+import { PTransform } from "./transform";
+import { PCollection } from "../pvalue";
+import * as internal from "./internal";
+import { count } from "./combiners";
+
+// TODO: (API) Consider groupBy as a top-level method on PCollections.
+// TBD how to best express the combiners.
+// - Idea 1: We could allow these as extra arguments to groupBy
+// - Idea 2: We could return a special GroupedPCollection that has a nice,
+// chain-able combining() method. We'd want the intermediates to
+// still be usable, but lazy.
+
+export interface CombineFn<I, A, O> {
+ createAccumulator: () => A;
+ addInput: (A, I) => A;
+ mergeAccumulators: (accumulators: Iterable<A>) => A;
+ extractOutput: (A) => O;
+}
+
+// TODO: (Typescript) When typing this as ((a: I, b: I) => I), types are not
inferred well.
+type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
+
+/**
+ * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * of elements grouped by a field, multiple fields, an expression that is used
+ * as the grouping key.
+ *
+ * @extends PTransform
+ */
+export class GroupBy<T, K> extends PTransform<
+ PCollection<T>,
+ PCollection<KV<K, Iterable<T>>>
+> {
+ keyFn: (element: T) => K;
+ keyNames: string | string[];
+ keyName: string;
+
+ /**
+ * Create a GroupBy transform.
+ *
+ * @param key: The name of the key in the JSON object, or a function that
returns the key for a given element.
+ */
+ constructor(
+ key: string | string[] | ((element: T) => K),
+ keyName: string | undefined = undefined
+ ) {
+ super();
+ [this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
+ this.keyName = typeof this.keyNames == "string" ? this.keyNames : "key";
+ }
+
+ expand(input: PCollection<T>): PCollection<KV<K, Iterable<T>>> {
+ const keyFn = this.keyFn;
+ return input
+ .map((x) => ({ key: keyFn(x), value: x }))
+ .apply(new internal.GroupByKey());
+ }
+
+ combining<I>(
+ expr: string | ((element: T) => I),
+ combiner: Combiner<I>,
+ resultName: string
+ ) {
+ return new GroupByAndCombine(this.keyFn, this.keyNames, []).combining(
+ expr,
+ combiner,
+ resultName
+ );
+ }
+}
+
+/**
+ * Groups all elements of the input PCollection together.
+ *
+ * This is generally used with one or more combining specifications, as one
+ * looses parallelization benefits in bringing all elements of a distributed
Review Comment:
```suggestion
* loses parallelization benefits in bringing all elements of a distributed
```
Nit
##########
sdks/typescript/src/apache_beam/transforms/transform.ts:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import { PValue } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+
+export function withName<T>(name: string | (() => string), arg: T): T {
+ (arg as any).beamName = name;
+ return arg;
+}
+
+export function extractName<T>(withName: T): string {
+ const untyped = withName as any;
+ if (untyped.beamName != undefined) {
+ if (typeof untyped.beamName == "string") {
+ return untyped.beamName;
+ } else {
+ return untyped.beamName();
+ }
+ } else if (
+ untyped.name != undefined &&
+ untyped.name &&
Review Comment:
```suggestion
untyped.name &&
```
The first check here is superfluous since if `untyped.name` is undefined,
`untyped.name` will evaluate to false
##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+ assignWindows: (Instant) => W[];
+ windowCoder: () => Coder<W>;
+ toProto: () => runnerApi.FunctionSpec;
+ isMerging: () => boolean;
+ assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ static createWindowingStrategy(
+ pipeline: Pipeline,
+ windowFn: WindowFn<any>,
+ windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+ ): runnerApi.WindowingStrategy {
+ let result: runnerApi.WindowingStrategy;
+ if (windowingStrategyBase == undefined) {
+ result = {
+ windowFn: undefined!,
+ windowCoderId: undefined!,
+ mergeStatus: undefined!,
+ assignsToOneWindow: undefined!,
+ trigger: { trigger: { oneofKind: "default", default: {} } },
+ accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+ outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+ closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+ onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+ allowedLateness: BigInt(0),
+ environmentId: pipeline.defaultEnvironment,
+ };
+ } else {
+ result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+ }
+ result.windowFn = windowFn.toProto();
+ result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+ result.mergeStatus = windowFn.isMerging()
+ ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+ : runnerApi.MergeStatus_Enum.NON_MERGING;
+ result.assignsToOneWindow = windowFn.assignsToOneWindow();
+ return result;
+ }
+
+ constructor(
+ private windowFn: WindowFn<W>,
+ private windowingStrategyBase:
+ | runnerApi.WindowingStrategy
+ | undefined = undefined
+ ) {
+ super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_WINDOW_INTO_DOFN_URN,
+ payload: serializeFn({ windowFn: this.windowFn }),
+ }),
+ })
+ ),
+ });
+
+ const inputCoder = pipeline.context.getPCollectionCoderId(input);
+ return pipeline.createPCollectionInternal<T>(
+ inputCoder,
+ WindowInto.createWindowingStrategy(
+ pipeline,
+ this.windowFn,
+ this.windowingStrategyBase
+ )
+ );
+ }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ constructor(private func: (T, Instant) => typeof Instant) {
+ super();
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+ payload: serializeFn({ func: this.func }),
+ }),
+ })
+ ),
+ });
+
Review Comment:
This code is duplicated a number of times, with the only difference being
the urn - consider pulling it out into its own function
Issue Time Tracking
-------------------
Worklog Id: (was: 762402)
Time Spent: 3h 50m (was: 3h 40m)
> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
> Key: BEAM-1754
> URL: https://issues.apache.org/jira/browse/BEAM-1754
> Project: Beam
> Issue Type: New Feature
> Components: sdk-ideas
> Reporter: Diego Zuluaga
> Assignee: Kerry Donny-Clark
> Priority: P3
> Labels: node.js
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples
> highly comprehensible. However, I have to admit that for most Node.js
> developers who have little background on typed languages and are used to get
> up to speed with frameworks incredibly fast, learning Dataflow might take
> some learning curve that they/we're not used to. So, I wonder if at any point
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the
> question, but I wanted to run it by the team as it would be awesome to have
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python
--
This message was sent by Atlassian Jira
(v8.20.7#820007)