damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858810856


##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Do we need to do an undefined check here? I think either a or b could be 
undefined



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Same question - do we need to handle undefined?



##########
sdks/typescript/src/apache_beam/transforms/group_and_combine.ts:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { KV } from "../values";
+import { PTransform } from "./transform";
+import { PCollection } from "../pvalue";
+import * as internal from "./internal";
+import { count } from "./combiners";
+
+// TODO: (API) Consider groupBy as a top-level method on PCollections.
+// TBD how to best express the combiners.
+//     - Idea 1: We could allow these as extra arguments to groupBy
+//     - Idea 2: We could return a special GroupedPCollection that has a nice,
+//               chain-able combining() method. We'd want the intermediates to
+//               still be usable, but lazy.
+
+export interface CombineFn<I, A, O> {
+  createAccumulator: () => A;
+  addInput: (A, I) => A;
+  mergeAccumulators: (accumulators: Iterable<A>) => A;
+  extractOutput: (A) => O;
+}
+
+// TODO: (Typescript) When typing this as ((a: I, b: I) => I), types are not 
inferred well.
+type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
+
+/**
+ * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * of elements grouped by a field, multiple fields, an expression that is used
+ * as the grouping key.
+ *
+ * @extends PTransform
+ */
+export class GroupBy<T, K> extends PTransform<
+  PCollection<T>,
+  PCollection<KV<K, Iterable<T>>>
+> {
+  keyFn: (element: T) => K;
+  keyNames: string | string[];
+  keyName: string;
+
+  /**
+   * Create a GroupBy transform.
+   *
+   * @param key: The name of the key in the JSON object, or a function that 
returns the key for a given element.
+   */
+  constructor(
+    key: string | string[] | ((element: T) => K),
+    keyName: string | undefined = undefined
+  ) {
+    super();
+    [this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
+    this.keyName = typeof this.keyNames == "string" ? this.keyNames : "key";
+  }
+
+  expand(input: PCollection<T>): PCollection<KV<K, Iterable<T>>> {
+    const keyFn = this.keyFn;
+    return input
+      .map((x) => ({ key: keyFn(x), value: x }))
+      .apply(new internal.GroupByKey());
+  }
+
+  combining<I>(
+    expr: string | ((element: T) => I),
+    combiner: Combiner<I>,
+    resultName: string
+  ) {
+    return new GroupByAndCombine(this.keyFn, this.keyNames, []).combining(
+      expr,
+      combiner,
+      resultName
+    );
+  }
+}
+
+/**
+ * Groups all elements of the input PCollection together.
+ *
+ * This is generally used with one or more combining specifications, as one
+ * looses parallelization benefits in bringing all elements of a distributed

Review Comment:
   ```suggestion
    * loses parallelization benefits in bringing all elements of a distributed
   ```
   
   Nit



##########
sdks/typescript/src/apache_beam/transforms/transform.ts:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import { PValue } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+
+export function withName<T>(name: string | (() => string), arg: T): T {
+  (arg as any).beamName = name;
+  return arg;
+}
+
+export function extractName<T>(withName: T): string {
+  const untyped = withName as any;
+  if (untyped.beamName != undefined) {
+    if (typeof untyped.beamName == "string") {
+      return untyped.beamName;
+    } else {
+      return untyped.beamName();
+    }
+  } else if (
+    untyped.name != undefined &&
+    untyped.name &&

Review Comment:
   ```suggestion
       untyped.name &&
   ```
   
   The first check here is superfluous since if `untyped.name` is undefined, 
`untyped.name` will evaluate to false



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   This code is duplicated a number of times, with the only difference being 
the urn - consider pulling it out into its own function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to