[ 
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=764422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764422
 ]

ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Apr/22 19:09
            Start Date: 29/Apr/22 19:09
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r862071859


##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   I won't push further to change this since its not logically incorrect, but 
if we enter the first `if` then we set the `bundleId` entry to `new Map()`. 
That in turn means that this second `if` will always evaluate to false when the 
first one evaluates to true.
   
   Basically, they are logically equivalent, but we can save the extra if check 
¯\_(ツ)_/¯ 



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   SGTM





Issue Time Tracking
-------------------

    Worklog Id:     (was: 764422)
    Time Spent: 5.5h  (was: 5h 20m)

> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
>                 Key: BEAM-1754
>                 URL: https://issues.apache.org/jira/browse/BEAM-1754
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Diego Zuluaga
>            Assignee: Kerry Donny-Clark
>            Priority: P3
>              Labels: node.js
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples 
> highly comprehensible. However, I have to admit that for most Node.js 
> developers who have little background on typed languages and are used to get 
> up to speed with frameworks incredibly fast, learning Dataflow might take 
> some learning curve that they/we're not used to. So, I wonder if at any point 
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the 
> question, but I wanted to run it by the team as it would be awesome to have 
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to