[ 
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=762402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762402
 ]

ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 15:43
            Start Date: 26/Apr/22 15:43
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858810856


##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Do we need to do an undefined check here? I think either a or b could be 
undefined



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Same question - do we need to handle undefined?



##########
sdks/typescript/src/apache_beam/transforms/group_and_combine.ts:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { KV } from "../values";
+import { PTransform } from "./transform";
+import { PCollection } from "../pvalue";
+import * as internal from "./internal";
+import { count } from "./combiners";
+
+// TODO: (API) Consider groupBy as a top-level method on PCollections.
+// TBD how to best express the combiners.
+//     - Idea 1: We could allow these as extra arguments to groupBy
+//     - Idea 2: We could return a special GroupedPCollection that has a nice,
+//               chain-able combining() method. We'd want the intermediates to
+//               still be usable, but lazy.
+
+export interface CombineFn<I, A, O> {
+  createAccumulator: () => A;
+  addInput: (A, I) => A;
+  mergeAccumulators: (accumulators: Iterable<A>) => A;
+  extractOutput: (A) => O;
+}
+
+// TODO: (Typescript) When typing this as ((a: I, b: I) => I), types are not 
inferred well.
+type Combiner<I> = CombineFn<I, any, any> | ((a: any, b: any) => any);
+
+/**
+ * A PTransform that takes a PCollection of elements, and returns a PCollection
+ * of elements grouped by a field, multiple fields, an expression that is used
+ * as the grouping key.
+ *
+ * @extends PTransform
+ */
+export class GroupBy<T, K> extends PTransform<
+  PCollection<T>,
+  PCollection<KV<K, Iterable<T>>>
+> {
+  keyFn: (element: T) => K;
+  keyNames: string | string[];
+  keyName: string;
+
+  /**
+   * Create a GroupBy transform.
+   *
+   * @param key: The name of the key in the JSON object, or a function that 
returns the key for a given element.
+   */
+  constructor(
+    key: string | string[] | ((element: T) => K),
+    keyName: string | undefined = undefined
+  ) {
+    super();
+    [this.keyFn, this.keyNames] = extractFnAndName(key, keyName || "key");
+    this.keyName = typeof this.keyNames == "string" ? this.keyNames : "key";
+  }
+
+  expand(input: PCollection<T>): PCollection<KV<K, Iterable<T>>> {
+    const keyFn = this.keyFn;
+    return input
+      .map((x) => ({ key: keyFn(x), value: x }))
+      .apply(new internal.GroupByKey());
+  }
+
+  combining<I>(
+    expr: string | ((element: T) => I),
+    combiner: Combiner<I>,
+    resultName: string
+  ) {
+    return new GroupByAndCombine(this.keyFn, this.keyNames, []).combining(
+      expr,
+      combiner,
+      resultName
+    );
+  }
+}
+
+/**
+ * Groups all elements of the input PCollection together.
+ *
+ * This is generally used with one or more combining specifications, as one
+ * looses parallelization benefits in bringing all elements of a distributed

Review Comment:
   ```suggestion
    * loses parallelization benefits in bringing all elements of a distributed
   ```
   
   Nit



##########
sdks/typescript/src/apache_beam/transforms/transform.ts:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import { PValue } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+
+export function withName<T>(name: string | (() => string), arg: T): T {
+  (arg as any).beamName = name;
+  return arg;
+}
+
+export function extractName<T>(withName: T): string {
+  const untyped = withName as any;
+  if (untyped.beamName != undefined) {
+    if (typeof untyped.beamName == "string") {
+      return untyped.beamName;
+    } else {
+      return untyped.beamName();
+    }
+  } else if (
+    untyped.name != undefined &&
+    untyped.name &&

Review Comment:
   ```suggestion
       untyped.name &&
   ```
   
   The first check here is superfluous since if `untyped.name` is undefined, 
`untyped.name` will evaluate to false



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   This code is duplicated a number of times, with the only difference being 
the urn - consider pulling it out into its own function





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762402)
    Time Spent: 3h 50m  (was: 3h 40m)

> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
>                 Key: BEAM-1754
>                 URL: https://issues.apache.org/jira/browse/BEAM-1754
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Diego Zuluaga
>            Assignee: Kerry Donny-Clark
>            Priority: P3
>              Labels: node.js
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples 
> highly comprehensible. However, I have to admit that for most Node.js 
> developers who have little background on typed languages and are used to get 
> up to speed with frameworks incredibly fast, learning Dataflow might take 
> some learning curve that they/we're not used to. So, I wonder if at any point 
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the 
> question, but I wanted to run it by the team as it would be awesome to have 
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to