[ 
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=762323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762323
 ]

ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 13:53
            Start Date: 26/Apr/22 13:53
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858663543


##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Instead of just logging, should we be returning an error response here?
   
   Structurally, it probably will be cleaner to move this if/else into the 
process function as well, that way as we add more options it doesn't end up 
bloating this block.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   If this `this.bundleProcessors.get(processor.descriptor.id)?` returns 
null/undefined, should we create it?



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   ```suggestion
       } else if (this.consumers.get(bundleId)!.has(transformId)) {
   ```
   
   Nit: we can save a check here



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);

Review Comment:
   ```suggestion
       throw new Error("Unknown transform type:" + transform.spec!.urn);
   ```
   
   This will throw 2 lines earlier if spec is null/undefined



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) 
{
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a 
Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a 
single bundle may contain
+      // elements from different windows, so each element must specify its 
window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) 
{}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   Could someone help me understand what this block is doing? It seems kinda 
odd to me - is the goal to end up with newWindowsOnce repeated 
`wvalue.windows.length` times, and if so - why?



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   ```suggestion
   export function registerOperator(urn: string, cls: OperatorClass): IOperator 
{
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import * as fnApi from "../proto/beam_fn_api";
+import { BeamFnStateClient } from "../proto/beam_fn_api.grpc-client";
+
+// TODO: (Extension) Lazy iteration via continuation tokens.
+// This will likely require promises all the way up to the consumer.
+
+interface PromiseWrapper<T> {
+  type: "promise";
+  promise: Promise<T>;
+}
+
+interface ValueWrapper<T> {
+  type: "value";
+  value: T;
+}
+
+// We want to avoid promises when not needed (e.g. for a cache hit) as they
+// have to bubble all the way up the stack.
+export type MaybePromise<T> = PromiseWrapper<T> | ValueWrapper<T>;
+
+export interface StateProvider {
+  getState: <T>(
+    stateKey: fnApi.StateKey,
+    decode: (data: Uint8Array) => T
+  ) => MaybePromise<T>;
+}
+
+// TODO: (Advanced) Cross-bundle caching.
+export class CachingStateProvider implements StateProvider {
+  underlying: StateProvider;
+  cache: Map<string, MaybePromise<any>> = new Map();
+
+  constructor(underlying: StateProvider) {
+    this.underlying = underlying;
+  }
+
+  getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
+    // TODO: (Perf) Consider caching on something ligher-weight than the full
+    // serialized key, only constructing this proto when interacting with
+    // the runner.
+    const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64"
+    );
+    if (this.cache.has(cacheKey)) {
+      return this.cache.get(cacheKey)!;
+    } else {

Review Comment:
   Nit: It is probably cleaner to drop this else and unindent this block since 
we're early returning in the if



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Nit:
   
   ```suggestion
       } else if (!this.consumers.get(bundleId)!.has(transformId)) {
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762323)
    Time Spent: 3h 20m  (was: 3h 10m)

> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
>                 Key: BEAM-1754
>                 URL: https://issues.apache.org/jira/browse/BEAM-1754
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Diego Zuluaga
>            Assignee: Kerry Donny-Clark
>            Priority: P3
>              Labels: node.js
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples 
> highly comprehensible. However, I have to admit that for most Node.js 
> developers who have little background on typed languages and are used to get 
> up to speed with frameworks incredibly fast, learning Dataflow might take 
> some learning curve that they/we're not used to. So, I wonder if at any point 
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the 
> question, but I wanted to run it by the team as it would be awesome to have 
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to