damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858663543


##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Instead of just logging, should we be returning an error response here?
   
   Structurally, it probably will be cleaner to move this if/else into the 
process function as well, that way as we add more options it doesn't end up 
bloating this block.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   If this `this.bundleProcessors.get(processor.descriptor.id)?` returns 
null/undefined, should we create it?



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   ```suggestion
       } else if (this.consumers.get(bundleId)!.has(transformId)) {
   ```
   
   Nit: we can save a check here



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);

Review Comment:
   ```suggestion
       throw new Error("Unknown transform type:" + transform.spec!.urn);
   ```
   
   This will throw 2 lines earlier if spec is null/undefined



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) 
{
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a 
Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a 
single bundle may contain
+      // elements from different windows, so each element must specify its 
window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) 
{}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   Could someone help me understand what this block is doing? It seems kinda 
odd to me - is the goal to end up with newWindowsOnce repeated 
`wvalue.windows.length` times, and if so - why?



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   ```suggestion
   export function registerOperator(urn: string, cls: OperatorClass): IOperator 
{
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import * as fnApi from "../proto/beam_fn_api";
+import { BeamFnStateClient } from "../proto/beam_fn_api.grpc-client";
+
+// TODO: (Extension) Lazy iteration via continuation tokens.
+// This will likely require promises all the way up to the consumer.
+
+interface PromiseWrapper<T> {
+  type: "promise";
+  promise: Promise<T>;
+}
+
+interface ValueWrapper<T> {
+  type: "value";
+  value: T;
+}
+
+// We want to avoid promises when not needed (e.g. for a cache hit) as they
+// have to bubble all the way up the stack.
+export type MaybePromise<T> = PromiseWrapper<T> | ValueWrapper<T>;
+
+export interface StateProvider {
+  getState: <T>(
+    stateKey: fnApi.StateKey,
+    decode: (data: Uint8Array) => T
+  ) => MaybePromise<T>;
+}
+
+// TODO: (Advanced) Cross-bundle caching.
+export class CachingStateProvider implements StateProvider {
+  underlying: StateProvider;
+  cache: Map<string, MaybePromise<any>> = new Map();
+
+  constructor(underlying: StateProvider) {
+    this.underlying = underlying;
+  }
+
+  getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
+    // TODO: (Perf) Consider caching on something ligher-weight than the full
+    // serialized key, only constructing this proto when interacting with
+    // the runner.
+    const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64"
+    );
+    if (this.cache.has(cacheKey)) {
+      return this.cache.get(cacheKey)!;
+    } else {

Review Comment:
   Nit: It is probably cleaner to drop this else and unindent this block since 
we're early returning in the if



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Nit:
   
   ```suggestion
       } else if (!this.consumers.get(bundleId)!.has(transformId)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to