[
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=764422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764422
]
ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Apr/22 19:09
Start Date: 29/Apr/22 19:09
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r862071859
##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnDataClient,
+ IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+ dataClient: BeamFnDataClient;
+ dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+ consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+ constructor(endpoint: string, workerId: string) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", workerId);
+ this.dataClient = new BeamFnDataClient(
+ endpoint,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.dataChannel = this.dataClient.data(metadata);
+ this.dataChannel.on("data", async (elements) => {
+ console.log("data", elements);
+ for (const data of elements.data) {
+ const consumer = this.getConsumer(data.instructionId,
data.transformId);
+ try {
+ await consumer.sendData(data.data);
+ if (data.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ for (const timers of elements.timers) {
+ const consumer = this.getConsumer(
+ timers.instructionId,
+ timers.transformId
+ );
+ try {
+ await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+ if (timers.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ });
+ }
+
+ close() {
+ this.dataChannel.end();
+ }
+
+ async registerConsumer(
+ bundleId: string,
+ transformId: string,
+ consumer: IDataChannel
+ ) {
+ consumer = truncateOnErrorDataChannel(consumer);
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (this.consumers.get(bundleId)!.has(transformId)) {
Review Comment:
I won't push further to change this since its not logically incorrect, but
if we enter the first `if` then we set the `bundleId` entry to `new Map()`.
That in turn means that this second `if` will always evaluate to false when the
first one evaluates to true.
Basically, they are logically equivalent, but we can save the extra if check
¯\_(ツ)_/¯
##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+ assignWindows: (Instant) => W[];
+ windowCoder: () => Coder<W>;
+ toProto: () => runnerApi.FunctionSpec;
+ isMerging: () => boolean;
+ assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ static createWindowingStrategy(
+ pipeline: Pipeline,
+ windowFn: WindowFn<any>,
+ windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+ ): runnerApi.WindowingStrategy {
+ let result: runnerApi.WindowingStrategy;
+ if (windowingStrategyBase == undefined) {
+ result = {
+ windowFn: undefined!,
+ windowCoderId: undefined!,
+ mergeStatus: undefined!,
+ assignsToOneWindow: undefined!,
+ trigger: { trigger: { oneofKind: "default", default: {} } },
+ accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+ outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+ closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+ onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+ allowedLateness: BigInt(0),
+ environmentId: pipeline.defaultEnvironment,
+ };
+ } else {
+ result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+ }
+ result.windowFn = windowFn.toProto();
+ result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+ result.mergeStatus = windowFn.isMerging()
+ ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+ : runnerApi.MergeStatus_Enum.NON_MERGING;
+ result.assignsToOneWindow = windowFn.assignsToOneWindow();
+ return result;
+ }
+
+ constructor(
+ private windowFn: WindowFn<W>,
+ private windowingStrategyBase:
+ | runnerApi.WindowingStrategy
+ | undefined = undefined
+ ) {
+ super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_WINDOW_INTO_DOFN_URN,
+ payload: serializeFn({ windowFn: this.windowFn }),
+ }),
+ })
+ ),
+ });
+
+ const inputCoder = pipeline.context.getPCollectionCoderId(input);
+ return pipeline.createPCollectionInternal<T>(
+ inputCoder,
+ WindowInto.createWindowingStrategy(
+ pipeline,
+ this.windowFn,
+ this.windowingStrategyBase
+ )
+ );
+ }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ constructor(private func: (T, Instant) => typeof Instant) {
+ super();
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+ payload: serializeFn({ func: this.func }),
+ }),
+ })
+ ),
+ });
+
Review Comment:
SGTM
Issue Time Tracking
-------------------
Worklog Id: (was: 764422)
Time Spent: 5.5h (was: 5h 20m)
> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
> Key: BEAM-1754
> URL: https://issues.apache.org/jira/browse/BEAM-1754
> Project: Beam
> Issue Type: New Feature
> Components: sdk-ideas
> Reporter: Diego Zuluaga
> Assignee: Kerry Donny-Clark
> Priority: P3
> Labels: node.js
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples
> highly comprehensible. However, I have to admit that for most Node.js
> developers who have little background on typed languages and are used to get
> up to speed with frameworks incredibly fast, learning Dataflow might take
> some learning curve that they/we're not used to. So, I wonder if at any point
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the
> question, but I wanted to run it by the team as it would be awesome to have
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python
--
This message was sent by Atlassian Jira
(v8.20.7#820007)