[ 
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=764392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764392
 ]

ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Apr/22 18:11
            Start Date: 29/Apr/22 18:11
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r861994013


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {

Review Comment:
   This doesn't return the operator.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/examples/wordcount2.ts:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: Should this be in a top-level examples dir, rather than under 
apache_beam.
+
+import * as beam from "../../apache_beam";
+import * as textio from "../io/textio";
+import { DirectRunner } from "../runners/direct_runner";
+
+import { count } from "../transforms/combiners";
+import { GroupBy } from "../transforms/group_and_combine";
+
+import { PortableRunner } from "../runners/portable_runner/runner";
+
+class CountElements extends beam.PTransform<
+  beam.PCollection<any>,
+  beam.PCollection<any>
+> {
+  expand(input: beam.PCollection<any>) {
+    return input
+      .map((e) => ({ element: e }))
+      .apply(new GroupBy("element").combining("element", count, "count"));
+  }
+}
+
+function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
+  return lines
+    .map((s: string) => s.toLowerCase())
+    .flatMap(function* (line: string) {
+      yield* line.split(/[^a-z]+/);
+    })
+    .apply(new CountElements("Count"));
+}
+
+async function main() {
+  // python apache_beam/runners/portability/local_job_service_main.py --port 
3333
+  await new PortableRunner("localhost:3333").run(async (root) => {
+    const lines = await root.asyncApply(
+      new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")

Review Comment:
   Done.



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   This is a two-level mapping; the first `if` makes sure the first level 
exists and the second checks the second level. 



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(

Review Comment:
   As I read the documentation, spanSync waits until the process has completed, 
which is not what we want here. 



##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnDataClient,
+  IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+  dataClient: BeamFnDataClient;
+  dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+  consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+  constructor(endpoint: string, workerId: string) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", workerId);
+    this.dataClient = new BeamFnDataClient(
+      endpoint,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.dataChannel = this.dataClient.data(metadata);
+    this.dataChannel.on("data", async (elements) => {
+      console.log("data", elements);
+      for (const data of elements.data) {
+        const consumer = this.getConsumer(data.instructionId, 
data.transformId);
+        try {
+          await consumer.sendData(data.data);
+          if (data.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+      for (const timers of elements.timers) {
+        const consumer = this.getConsumer(
+          timers.instructionId,
+          timers.transformId
+        );
+        try {
+          await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+          if (timers.isLast) {
+            consumer.close();
+          }
+        } catch (error) {
+          consumer.onError(error);
+        }
+      }
+    });
+  }
+
+  close() {
+    this.dataChannel.end();
+  }
+
+  async registerConsumer(
+    bundleId: string,
+    transformId: string,
+    consumer: IDataChannel
+  ) {
+    consumer = truncateOnErrorDataChannel(consumer);
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (this.consumers.get(bundleId)!.has(transformId)) {
+      await (
+        this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+      ).flush(consumer);
+    }
+    this.consumers.get(bundleId)!.set(transformId, consumer);
+  }
+
+  unregisterConsumer(bundleId: string, transformId: string) {
+    this.consumers.get(bundleId)!.delete(transformId);
+  }
+
+  getConsumer(bundleId: string, transformId: string): IDataChannel {
+    if (!this.consumers.has(bundleId)) {
+      this.consumers.set(bundleId, new Map());
+    }
+    if (!this.consumers.get(bundleId)!.has(transformId)) {

Review Comment:
   Same.



##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+  assignWindows: (Instant) => W[];
+  windowCoder: () => Coder<W>;
+  toProto: () => runnerApi.FunctionSpec;
+  isMerging: () => boolean;
+  assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  static createWindowingStrategy(
+    pipeline: Pipeline,
+    windowFn: WindowFn<any>,
+    windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+  ): runnerApi.WindowingStrategy {
+    let result: runnerApi.WindowingStrategy;
+    if (windowingStrategyBase == undefined) {
+      result = {
+        windowFn: undefined!,
+        windowCoderId: undefined!,
+        mergeStatus: undefined!,
+        assignsToOneWindow: undefined!,
+        trigger: { trigger: { oneofKind: "default", default: {} } },
+        accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+        outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+        closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+        onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+        allowedLateness: BigInt(0),
+        environmentId: pipeline.defaultEnvironment,
+      };
+    } else {
+      result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+    }
+    result.windowFn = windowFn.toProto();
+    result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+    result.mergeStatus = windowFn.isMerging()
+      ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+      : runnerApi.MergeStatus_Enum.NON_MERGING;
+    result.assignsToOneWindow = windowFn.assignsToOneWindow();
+    return result;
+  }
+
+  constructor(
+    private windowFn: WindowFn<W>,
+    private windowingStrategyBase:
+      | runnerApi.WindowingStrategy
+      | undefined = undefined
+  ) {
+    super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_WINDOW_INTO_DOFN_URN,
+            payload: serializeFn({ windowFn: this.windowFn }),
+          }),
+        })
+      ),
+    });
+
+    const inputCoder = pipeline.context.getPCollectionCoderId(input);
+    return pipeline.createPCollectionInternal<T>(
+      inputCoder,
+      WindowInto.createWindowingStrategy(
+        pipeline,
+        this.windowFn,
+        this.windowingStrategyBase
+      )
+    );
+  }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+  PCollection<T>,
+  PCollection<T>
+> {
+  constructor(private func: (T, Instant) => typeof Instant) {
+    super();
+  }
+
+  expandInternal(
+    input: PCollection<T>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    transformProto.spec = runnerApi.FunctionSpec.create({
+      urn: ParDo.urn,
+      payload: runnerApi.ParDoPayload.toBinary(
+        runnerApi.ParDoPayload.create({
+          doFn: runnerApi.FunctionSpec.create({
+            urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+            payload: serializeFn({ func: this.func }),
+          }),
+        })
+      ),
+    });
+

Review Comment:
   The payloads are different as well. I'm going to leave this as is, as 
there's also value in making this quasi-object-literal very transparent. 



##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+  ParamProviderImpl,
+  SideInputInfo,
+  createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+  promises: Promise<void>[] = [];
+  add(result: ProcessResult) {
+    if (result != NonPromise) {
+      this.promises.push(result as Promise<void>);
+    }
+  }
+  build(): ProcessResult {
+    if (this.promises.length == 0) {
+      return NonPromise;
+    } else if (this.promises.length == 1) {
+      return this.promises[0];
+    } else {
+      return Promise.all(this.promises).then(() => void null);
+    }
+  }
+}
+
+export interface IOperator {
+  startBundle: () => Promise<void>;
+  // As this is called at every operator at every element, and the vast 
majority
+  // of the time Promises are not needed, we wish to avoid the overhead of
+  // creating promisses and await as much as possible.
+  process: (wv: WindowedValue<unknown>) => ProcessResult;
+  finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+  constructor(private operators: IOperator[]) {}
+
+  receive(wvalue: WindowedValue<unknown>): ProcessResult {
+    if (this.operators.length == 1) {
+      return this.operators[0].process(wvalue);
+    } else {
+      const result = new ProcessResultBuilder();
+      for (const operator of this.operators) {
+        result.add(operator.process(wvalue));
+      }
+      return result.build();
+    }
+  }
+}
+
+export class OperatorContext {
+  pipelineContext: PipelineContext;
+  constructor(
+    public descriptor: ProcessBundleDescriptor,
+    public getReceiver: (string) => Receiver,
+    public getDataChannel: (string) => MultiplexingDataChannel,
+    public getStateProvider: () => StateProvider,
+    public getBundleId: () => string
+  ) {
+    this.pipelineContext = new PipelineContext(descriptor);
+  }
+}
+
+export function createOperator(
+  transformId: string,
+  context: OperatorContext
+): IOperator {
+  const transform = context.descriptor.transforms[transformId];
+  // Ensure receivers are eagerly created.
+  Object.values(transform.outputs).map(context.getReceiver);
+  let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+  if (operatorConstructor == undefined) {
+    throw new Error("Unknown transform type:" + transform.spec?.urn);
+  }
+  return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+  transformId: string,
+  transformProto: PTransform,
+  context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+  new (
+    transformId: string,
+    transformProto: PTransform,
+    context: OperatorContext
+  ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+  registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+    return new cls(transformId, transformProto, context);
+  });
+}
+
+export function registerOperatorConstructor(
+  urn: string,
+  constructor: OperatorConstructor
+) {
+  operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  receiver: Receiver;
+  coder: Coder<WindowedValue<unknown>>;
+  endOfData: Promise<void>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      readPort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    this.coder = context.pipelineContext.getCoder(readPort.coderId);
+  }
+
+  async startBundle() {
+    const this_ = this;
+    var endOfDataResolve, endOfDataReject;
+    this.endOfData = new Promise(async (resolve, reject) => {
+      endOfDataResolve = resolve;
+      endOfDataReject = reject;
+    });
+
+    await this_.multiplexingDataChannel.registerConsumer(
+      this_.getBundleId(),
+      this_.transformId,
+      {
+        sendData: async function (data: Uint8Array) {
+          console.log("Got", data);
+          const reader = new protobufjs.Reader(data);
+          while (reader.pos < reader.len) {
+            const maybePromise = this_.receiver.receive(
+              this_.coder.decode(reader, CoderContext.needsDelimiters)
+            );
+            if (maybePromise != NonPromise) {
+              await maybePromise;
+            }
+          }
+        },
+        sendTimers: async function (timerFamilyId: string, timers: Uint8Array) 
{
+          throw Error("Not expecting timers.");
+        },
+        close: function () {
+          endOfDataResolve();
+        },
+        onError: function (error: Error) {
+          endOfDataReject(error);
+        },
+      }
+    );
+  }
+
+  process(wvalue: WindowedValue<unknown>): ProcessResult {
+    throw Error("Data should not come in via process.");
+  }
+
+  async finishBundle() {
+    try {
+      await this.endOfData;
+    } finally {
+      this.multiplexingDataChannel.unregisterConsumer(
+        this.getBundleId(),
+        this.transformId
+      );
+    }
+  }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+  transformId: string;
+  getBundleId: () => string;
+  multiplexingDataChannel: MultiplexingDataChannel;
+  channel: IDataChannel;
+  coder: Coder<WindowedValue<unknown>>;
+  buffer: protobufjs.Writer;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+    this.multiplexingDataChannel = context.getDataChannel(
+      writePort.apiServiceDescriptor!.url
+    );
+    this.transformId = transformId;
+    this.getBundleId = context.getBundleId;
+    this.coder = context.pipelineContext.getCoder(writePort.coderId);
+  }
+
+  async startBundle() {
+    this.channel = this.multiplexingDataChannel.getSendChannel(
+      this.getBundleId(),
+      this.transformId
+    );
+    this.buffer = new protobufjs.Writer();
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+    if (this.buffer.len > 1e6) {
+      return this.flush();
+    }
+    return NonPromise;
+  }
+
+  async finishBundle() {
+    await this.flush();
+    this.channel.close();
+  }
+
+  async flush() {
+    if (this.buffer.len > 0) {
+      await this.channel.sendData(this.buffer.finish());
+      this.buffer = new protobufjs.Writer();
+    }
+  }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+  receiver: Receiver;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+  }
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+  private doFn: DoFn<unknown, unknown, unknown>;
+  private getStateProvider: () => StateProvider;
+  private sideInputInfo: Map<string, SideInputInfo> = new Map();
+  private originalContext: object | undefined;
+  private augmentedContext: object | undefined;
+  private paramProvider: ParamProviderImpl;
+
+  constructor(
+    private transformId: string,
+    private receiver: Receiver,
+    private spec: runnerApi.ParDoPayload,
+    private payload: {
+      doFn: DoFn<unknown, unknown, unknown>;
+      context: any;
+    },
+    transformProto: runnerApi.PTransform,
+    operatorContext: OperatorContext
+  ) {
+    this.doFn = payload.doFn;
+    this.originalContext = payload.context;
+    this.getStateProvider = operatorContext.getStateProvider;
+    this.sideInputInfo = createSideInputInfo(
+      transformProto,
+      spec,
+      operatorContext
+    );
+  }
+
+  async startBundle() {
+    this.paramProvider = new ParamProviderImpl(
+      this.transformId,
+      this.sideInputInfo,
+      this.getStateProvider
+    );
+    this.augmentedContext = this.paramProvider.augmentContext(
+      this.originalContext
+    );
+    if (this.doFn.startBundle) {
+      this.doFn.startBundle(this.augmentedContext);
+    }
+  }
+
+  process(wvalue: WindowedValue<unknown>) {
+    if (this.augmentedContext && wvalue.windows.length != 1) {
+      // We need to process each window separately.
+      // TODO: (Perf) We could inspect the context more deeply and allow some
+      // cases to go through.
+      const result = new ProcessResultBuilder();
+      for (const window of wvalue.windows) {
+        result.add(
+          this.process({
+            value: wvalue.value,
+            windows: [window],
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      return result.build();
+    }
+
+    const this_ = this;
+    function reallyProcess(): ProcessResult {
+      const doFnOutput = this_.doFn.process(
+        wvalue.value,
+        this_.augmentedContext
+      );
+      if (!doFnOutput) {
+        return NonPromise;
+      }
+      const result = new ProcessResultBuilder();
+      for (const element of doFnOutput) {
+        result.add(
+          this_.receiver.receive({
+            value: element,
+            windows: wvalue.windows,
+            pane: wvalue.pane,
+            timestamp: wvalue.timestamp,
+          })
+        );
+      }
+      this_.paramProvider.update(undefined);
+      return result.build();
+    }
+
+    // Update the context with any information specific to this window.
+    const updateContextResult = this.paramProvider.update(wvalue);
+
+    // If we were able to do so without any deferred actions, process the
+    // element immediately.
+    if (updateContextResult == NonPromise) {
+      return reallyProcess();
+    } else {
+      // Otherwise return a promise that first waits for all the deferred
+      // actions to complete and then process the element.
+      return (async () => {
+        await updateContextResult;
+        const update2 = this.paramProvider.update(wvalue);
+        if (update2 != NonPromise) {
+          throw new Error("Expected all promises to be resolved: " + update2);
+        }
+        await reallyProcess();
+      })();
+    }
+  }
+
+  async finishBundle() {
+    if (this.doFn.finishBundle) {
+      const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+      if (!finishBundleOutput) {
+        return;
+      }
+      // The finishBundle method must return `void` or a 
Generator<WindowedValue<OutputT>>. It may not
+      // return Generator<OutputT> without windowing information because a 
single bundle may contain
+      // elements from different windows, so each element must specify its 
window.
+      for (const element of finishBundleOutput) {
+        const maybePromise = this.receiver.receive(element);
+        if (maybePromise != NonPromise) {
+          await maybePromise;
+        }
+      }
+    }
+  }
+}
+
+class IdentityParDoOperator implements IOperator {
+  constructor(private receiver: Receiver) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    return this.receiver.receive(wvalue);
+  }
+
+  async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+  constructor(
+    private splitter: (any) => string,
+    private receivers: { [key: string]: Receiver }
+  ) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const tag = this.splitter(wvalue.value);
+    const receiver = this.receivers[tag];
+    if (receiver) {
+      return receiver.receive(wvalue);
+    } else {
+      // TODO: (API) Make this configurable.
+      throw new Error(
+        "Unexpected tag '" +
+          tag +
+          "' for " +
+          wvalue.value +
+          " not in " +
+          [...Object.keys(this.receivers)]
+      );
+    }
+  }
+
+  async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+  constructor(private receivers: { [key: string]: Receiver }) {}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const result = new ProcessResultBuilder();
+    // TODO: (API) Should I exactly one instead of allowing a union?
+    for (const tag of Object.keys(wvalue.value as object)) {
+      const receiver = this.receivers[tag];
+      if (receiver) {
+        result.add(
+          receiver.receive({
+            value: (wvalue.value as object)[tag],
+            windows: wvalue.windows,
+            timestamp: wvalue.timestamp,
+            pane: wvalue.pane,
+          })
+        );
+      } else {
+        // TODO: (API) Make this configurable.
+        throw new Error(
+          "Unexpected tag '" +
+            tag +
+            "' for " +
+            wvalue.value +
+            " not in " +
+            [...Object.keys(this.receivers)]
+        );
+      }
+    }
+    return result.build();
+  }
+
+  async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+  constructor(private receiver: Receiver, private windowFn: WindowFn<Window>) 
{}
+
+  async startBundle() {}
+
+  process(wvalue: WindowedValue<unknown>) {
+    const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+    if (newWindowsOnce.length > 0) {
+      const newWindows: Window[] = [];
+      for (var i = 0; i < wvalue.windows.length; i++) {
+        newWindows.push(...newWindowsOnce);

Review Comment:
   That is correct. Added a comment to clarify.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";

Review Comment:
   Yep. Leftover from debugging. Removed.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);

Review Comment:
   Good call on both fronts. Done.



##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+//     npx tsc && npm run worker
+// From sdks/python
+//     python trivial_pipeline.py --environment_type=EXTERNAL 
--environment_config='localhost:5555' --runner=PortableRunner 
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+  ProcessBundleDescriptor,
+  ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+  BeamFnControlClient,
+  IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+  beamFnExternalWorkerPoolDefinition,
+  IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+  MultiplexingStateChannel,
+  CachingStateProvider,
+  GrpcStateProvider,
+  StateProvider,
+} from "./state";
+import {
+  IOperator,
+  Receiver,
+  createOperator,
+  OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+  controlUrl: string;
+}
+
+export class Worker {
+  controlClient: BeamFnControlClient;
+  controlChannel: grpc.ClientDuplexStream<
+    InstructionResponse,
+    InstructionRequest
+  >;
+
+  processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+  bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+  dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+  stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+  constructor(
+    private id: string,
+    private endpoints: WorkerEndpoints,
+    options: Object = {}
+  ) {
+    const metadata = new grpc.Metadata();
+    metadata.add("worker_id", this.id);
+    this.controlClient = new BeamFnControlClient(
+      endpoints.controlUrl,
+      grpc.ChannelCredentials.createInsecure(),
+      {},
+      {}
+    );
+    this.controlChannel = this.controlClient.control(metadata);
+    this.controlChannel.on("data", async (request) => {
+      console.log(request);
+      if (request.request.oneofKind == "processBundle") {
+        await this.process(request);
+      } else {
+        console.log("Unknown instruction type: ", request);
+      }
+    });
+    this.controlChannel.on("end", () => {
+      console.log("Control channel closed.");
+      for (const dataChannel of this.dataChannels.values()) {
+        dataChannel.close();
+      }
+      for (const stateChannel of this.stateChannels.values()) {
+        stateChannel.close();
+      }
+    });
+  }
+
+  async wait() {
+    // TODO: Await closing of control log.
+    await new Promise((r) => setTimeout(r, 1e9));
+  }
+
+  respond(response: InstructionResponse) {
+    this.controlChannel.write(response);
+  }
+
+  async process(request) {
+    const descriptorId =
+      request.request.processBundle.processBundleDescriptorId;
+    console.log("process", request.instructionId, descriptorId);
+    try {
+      if (!this.processBundleDescriptors.has(descriptorId)) {
+        const call = this.controlClient.getProcessBundleDescriptor(
+          {
+            processBundleDescriptorId: descriptorId,
+          },
+          (err, value: ProcessBundleDescriptor) => {
+            if (err) {
+              this.respond({
+                instructionId: request.instructionId,
+                error: "" + err,
+                response: {
+                  oneofKind: "processBundle",
+                  processBundle: {
+                    residualRoots: [],
+                    monitoringInfos: [],
+                    requiresFinalization: false,
+                    monitoringData: {},
+                  },
+                },
+              });
+            } else {
+              this.processBundleDescriptors.set(descriptorId, value);
+              this.process(request);
+            }
+          }
+        );
+        return;
+      }
+
+      const processor = this.aquireBundleProcessor(descriptorId);
+      await processor.process(request.instructionId);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "",
+        response: {
+          oneofKind: "processBundle",
+          processBundle: {
+            residualRoots: [],
+            monitoringInfos: [],
+            requiresFinalization: false,
+            monitoringData: {},
+          },
+        },
+      });
+      this.returnBundleProcessor(processor);
+    } catch (error) {
+      console.error("PROCESS ERROR", error);
+      await this.respond({
+        instructionId: request.instructionId,
+        error: "" + error,
+        response: { oneofKind: undefined },
+      });
+    }
+  }
+
+  aquireBundleProcessor(descriptorId: string) {
+    if (!this.bundleProcessors.has(descriptorId)) {
+      this.bundleProcessors.set(descriptorId, []);
+    }
+    const processor = this.bundleProcessors.get(descriptorId)?.pop();
+    if (processor != undefined) {
+      return processor;
+    } else {
+      return new BundleProcessor(
+        this.processBundleDescriptors.get(descriptorId)!,
+        this.getDataChannel.bind(this),
+        this.getStateChannel.bind(this)
+      );
+    }
+  }
+
+  returnBundleProcessor(processor: BundleProcessor) {
+    this.bundleProcessors.get(processor.descriptor.id)?.push(processor);

Review Comment:
   Yes. Done.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";
+    const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+    const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+    const projectRoot = path.resolve(
+      __dirname,
+      "..",
+      "..",
+      "..",
+      "..",
+      "..",
+      ".."
+    );

Review Comment:
   I agree. For now I added a TODO.



##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "context"
+       "flag"
+       "log"
+       "os"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+       // Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+       id                = flag.String("id", "", "Local identifier 
(required).")
+       loggingEndpoint   = flag.String("logging_endpoint", "", "Local logging 
endpoint for FnHarness (required).")
+       artifactEndpoint  = flag.String("artifact_endpoint", "", "Local 
artifact endpoint for FnHarness (required).")
+       provisionEndpoint = flag.String("provision_endpoint", "", "Local 
provision endpoint for FnHarness (required).")
+       controlEndpoint   = flag.String("control_endpoint", "", "Local control 
endpoint for FnHarness (required).")
+       semiPersistDir    = flag.String("semi_persist_dir", "/tmp", "Local 
semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+       flag.Parse()
+       if *id == "" {
+               log.Fatal("No id provided.")
+       }
+       if *provisionEndpoint == "" {
+               log.Fatal("No provision endpoint provided.")
+       }
+
+       ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+       info, err := provision.Info(ctx, *provisionEndpoint)
+       if err != nil {
+               log.Fatalf("Failed to obtain provisioning information: %v", err)
+       }
+       log.Printf("Provision info:\n%v", info)
+
+       // TODO(BEAM-8201): Simplify once flags are no longer used.

Review Comment:
   I'm not sure if all the cleanup has been done on the Dataflow side. @ihji 



##########
sdks/typescript/README.md:
##########
@@ -0,0 +1,208 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Typescript Beam SDK
+
+This is the start of a fully functioning Javascript (actually, Typescript) SDK.
+There are two distinct aims with this SDK
+
+1. Tap into the large (and relatively underserved, by existing data processing
+frameworks) community of javascript developers with a native SDK targeting 
this language.
+
+1. Develop a new SDK which can serve both as a proof of concept and reference
+that highlights the (relative) ease of porting Beam to new languages,
+a differentiating feature of Beam and Dataflow.
+
+To accomplish this, we lean heavily on the portability framework.
+For example, we make heavy use of cross-language transforms,
+in particular for IOs.
+In addition, the direct runner is simply an extension of the worker suitable
+for running on portable runners such as the ULR, which will directly transfer
+to running on production runners such as Dataflow and Flink.
+The target audience should hopefully not be put off by running other-language
+code encapsulated in docker images.
+
+## API
+
+We generally try to apply the concepts from the Beam API in a Typescript
+idiomatic way, but it should be noted that few of the initial developers
+have extensive (if any) Javascript/Typescript development experience, so
+feedback is greatly appreciated.
+
+In addition, some notable departures are taken from the traditional SDKs:
+
+* We take a "relational foundations" approach, where
+[schema'd 
data](https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf)
+is the primary way to interact with data, and we generally eschew the key-value
+requiring transforms in favor of a more flexible approach naming fields or
+expressions. Javascript's native Object is used as the row type.
+
+* As part of being schema-first we also de-emphasize Coders as a first-class
+concept in the SDK, relegating it to an advance feature used for interop.
+Though we can infer schemas from individual elements, it is still TBD to
+figure out if/how we can leverage the type system and/or function introspection
+to regularly infer schemas at construction time. A fallback coder using BSON
+encoding is used when we don't have sufficient type information.
+
+* We have added additional methods to the PCollection object, notably `map`
+and `flatmap`, [rather than only allowing 
apply](https://www.mail-archive.com/[email protected]/msg06035.html).
+In addition, `apply` can accept a function argument `(PColletion) => ...` as
+well as a PTransform subclass, which treats this callable as if it were a
+PTransform's expand.
+
+* In the other direction, we have eliminated the
+[problematic Pipeline object](https://s.apache.org/no-beam-pipeline)
+from the API, instead providing a `Root` PValue on which pipelines are built,
+and invoking run() on a Runner.  We offer a less error-prone `Runner.run`
+which finishes only when the pipeline is completely finished as well as
+`Runner.runAsync` which returns a handle to the running pipeline.
+
+* Rather than introduce PCollectionTuple, PCollectionList, etc. we let PValue
+literally be an
+[array or object with PValue 
values](https://github.com/robertwb/beam-javascript/blob/de4390dd767f046903ac23fead5db333290462db/sdks/node-ts/src/apache_beam/pvalue.ts#L116)
+which transforms can consume or produce.
+These are applied by wrapping them with the `P` operator, e.g.
+`P([pc1, pc2, pc3]).apply(new Flatten())`.
+
+* Like Python, `flatMap` and `ParDo.process` return multiple elements by
+yielding them from a generator, rather than invoking a passed-in callback.
+TBD how to output to multiple distinct PCollections.
+There is currently an operation to split a PCollection into multiple
+PCollections based on the properties of the elements, and
+we may consider using a callback for side outputs.
+
+* The `map`, `flatmap`, and `ParDo.proceess` methods take an additional
+(optional) context argument, which is similar to the keyword arguments
+used in Python. These can be "ordinary" javascript objects (which are passed
+as is) or special DoFnParam objects which provide getters to element-specific
+information (such as the current timestamp, window, or side input) at runtime.
+
+* Javascript supports (and encourages) an asynchronous programing model, with
+many libraries requiring use of the async/await paradigm.
+As there is no way (by design) to go from the asyncronous style back to
+the synchronous style, this needs to be taken into account
+when designing the API.
+We currently offer asynchronous variants of `PValue.apply(...)` (in addition
+to the synchronous ones, as they are easier to chain) as well as making
+`Runner.run` asynchronous. TBD to do this for all user callbacks as well.
+
+An example pipeline can be found at 
https://github.com/robertwb/beam-javascript/blob/javascript/sdks/node-ts/src/apache_beam/examples/wordcount.ts
+
+## TODO
+
+This SDK is a work in progress. In January 2022 we developed the ability to
+construct and run basic pipelines (including external transforms and running
+on a portable runner) but the following big-ticket items remain.
+
+* Containerization
+
+  * Function and object serialization: we currently only support "loopback"
+  mode; to be able to run on a remote, distributed manner we need to finish up
+  the work in picking closures and DoFn objects. Some investigation has been
+  started here, but all existing libraries have non-trivial drawbacks.
+
+  * Finish the work in building a full SDK container image that starts
+  the worker.
+
+  * Actually use worker threads for multiple bundles.
+
+* API
+
+  * There are several TODOs of minor features or design decisions to finalize.
+
+    * Consider using (or supporting) 2-arrays rather than {key, value} objects
+      for KVs.
+
+    * Consider renaming map/flatMap to doMap/doFlatMap to avoid confusion with
+    Array.map that takes a key as a second callback argument.
+    Or force the second argument to be an Object, which would lead to a less
+    confusing API and clean up the implementation.
+    Also add a [do]Filter, and possibly a [do]Reduce?
+
+    * Move away from using classes.
+
+  * Add the ability to set good PTransform names, and ideally infer good
+  defaults.
+
+  * Advanced features like metrics, state, timers, and SDF.
+  Possibly some of these can wait.
+
+* Infrastructure
+
+  * Gradle and Jenkins integration for tests and style enforcement.
+
+* Other
+
+  * Enforce unique names for pipeline update.
+
+  * PipelineOptions should be a Javascript Object, not a proto Struct.
+
+  * Though Dataflow Runner v2 supports portability, submission is still done
+  via v1beta3 and interaction with GCS rather than the job submission API.
+
+  * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+
+  * Avoid `any` return types (and re-enable check in compiler).
+
+  * Relative vs. absoute imports, possibly via setting a base url with a
+  `jsconfig.json`.  Also remove imports from base.ts.
+
+  * More/better tests, including tests of illegal/unsupported use.
+
+  * Set channel options like `grpc.max_{send,receive}_message_length` as we
+  do in other SDKs.
+
+  * Reduce use of any.
+
+    * Could use `unknown` in its place where the type is truly unknown.
+
+    * It'd be nice to enforce, maybe re-enable `noImplicitAny: true` in
+    tsconfig if we can get the generated proto files to be ignored.
+
+  * Enable a linter like eslint and fix at least the low hanging fruit.
+
+There is probably more; there are many TODOs littered throughout the code.
+
+This code has also not yet been fully peer reviewed (it was the result of a
+hackathon) which needs to be done before putting it into the man repository.
+
+
+## Development.

Review Comment:
   Yes. Excellent.



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();

Review Comment:
   I agree. Done.



##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc, i) => acc + 1,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+  createAccumulator: () => 0,
+  addInput: (acc: number, i: number) => acc + i,
+  mergeAccumulators: (accumulators: number[]) =>
+    accumulators.reduce((prev, current) => prev + current),
+  extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a > b ? a : b)),
+  extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+  createAccumulator: () => undefined,
+  addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+  mergeAccumulators: (accumulators: any[]) =>
+    accumulators.reduce((a, b) => (a < b ? a : b)),

Review Comment:
   Done.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 764392)
    Time Spent: 5h 20m  (was: 5h 10m)

> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
>                 Key: BEAM-1754
>                 URL: https://issues.apache.org/jira/browse/BEAM-1754
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Diego Zuluaga
>            Assignee: Kerry Donny-Clark
>            Priority: P3
>              Labels: node.js
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples 
> highly comprehensible. However, I have to admit that for most Node.js 
> developers who have little background on typed languages and are used to get 
> up to speed with frameworks incredibly fast, learning Dataflow might take 
> some learning curve that they/we're not used to. So, I wonder if at any point 
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the 
> question, but I wanted to run it by the team as it would be awesome to have 
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to