[
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=764392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764392
]
ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Apr/22 18:11
Start Date: 29/Apr/22 18:11
Worklog Time Spent: 10m
Work Description: robertwb commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r861994013
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+ ParamProviderImpl,
+ SideInputInfo,
+ createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+ promises: Promise<void>[] = [];
+ add(result: ProcessResult) {
+ if (result != NonPromise) {
+ this.promises.push(result as Promise<void>);
+ }
+ }
+ build(): ProcessResult {
+ if (this.promises.length == 0) {
+ return NonPromise;
+ } else if (this.promises.length == 1) {
+ return this.promises[0];
+ } else {
+ return Promise.all(this.promises).then(() => void null);
+ }
+ }
+}
+
+export interface IOperator {
+ startBundle: () => Promise<void>;
+ // As this is called at every operator at every element, and the vast
majority
+ // of the time Promises are not needed, we wish to avoid the overhead of
+ // creating promisses and await as much as possible.
+ process: (wv: WindowedValue<unknown>) => ProcessResult;
+ finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+ constructor(private operators: IOperator[]) {}
+
+ receive(wvalue: WindowedValue<unknown>): ProcessResult {
+ if (this.operators.length == 1) {
+ return this.operators[0].process(wvalue);
+ } else {
+ const result = new ProcessResultBuilder();
+ for (const operator of this.operators) {
+ result.add(operator.process(wvalue));
+ }
+ return result.build();
+ }
+ }
+}
+
+export class OperatorContext {
+ pipelineContext: PipelineContext;
+ constructor(
+ public descriptor: ProcessBundleDescriptor,
+ public getReceiver: (string) => Receiver,
+ public getDataChannel: (string) => MultiplexingDataChannel,
+ public getStateProvider: () => StateProvider,
+ public getBundleId: () => string
+ ) {
+ this.pipelineContext = new PipelineContext(descriptor);
+ }
+}
+
+export function createOperator(
+ transformId: string,
+ context: OperatorContext
+): IOperator {
+ const transform = context.descriptor.transforms[transformId];
+ // Ensure receivers are eagerly created.
+ Object.values(transform.outputs).map(context.getReceiver);
+ let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+ if (operatorConstructor == undefined) {
+ throw new Error("Unknown transform type:" + transform.spec?.urn);
+ }
+ return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+ new (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+ ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
Review Comment:
This doesn't return the operator.
##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc, i) => acc + 1,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc: number, i: number) => acc + i,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a > b ? a : b)),
Review Comment:
Yes. Done.
##########
sdks/typescript/src/apache_beam/examples/wordcount2.ts:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TODO: Should this be in a top-level examples dir, rather than under
apache_beam.
+
+import * as beam from "../../apache_beam";
+import * as textio from "../io/textio";
+import { DirectRunner } from "../runners/direct_runner";
+
+import { count } from "../transforms/combiners";
+import { GroupBy } from "../transforms/group_and_combine";
+
+import { PortableRunner } from "../runners/portable_runner/runner";
+
+class CountElements extends beam.PTransform<
+ beam.PCollection<any>,
+ beam.PCollection<any>
+> {
+ expand(input: beam.PCollection<any>) {
+ return input
+ .map((e) => ({ element: e }))
+ .apply(new GroupBy("element").combining("element", count, "count"));
+ }
+}
+
+function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
+ return lines
+ .map((s: string) => s.toLowerCase())
+ .flatMap(function* (line: string) {
+ yield* line.split(/[^a-z]+/);
+ })
+ .apply(new CountElements("Count"));
+}
+
+async function main() {
+ // python apache_beam/runners/portability/local_job_service_main.py --port
3333
+ await new PortableRunner("localhost:3333").run(async (root) => {
+ const lines = await root.asyncApply(
+ new textio.ReadFromText("gs://dataflow-samples/shakespeare/kinglear.txt")
Review Comment:
Done.
##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnDataClient,
+ IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+ dataClient: BeamFnDataClient;
+ dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+ consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+ constructor(endpoint: string, workerId: string) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", workerId);
+ this.dataClient = new BeamFnDataClient(
+ endpoint,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.dataChannel = this.dataClient.data(metadata);
+ this.dataChannel.on("data", async (elements) => {
+ console.log("data", elements);
+ for (const data of elements.data) {
+ const consumer = this.getConsumer(data.instructionId,
data.transformId);
+ try {
+ await consumer.sendData(data.data);
+ if (data.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ for (const timers of elements.timers) {
+ const consumer = this.getConsumer(
+ timers.instructionId,
+ timers.transformId
+ );
+ try {
+ await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+ if (timers.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ });
+ }
+
+ close() {
+ this.dataChannel.end();
+ }
+
+ async registerConsumer(
+ bundleId: string,
+ transformId: string,
+ consumer: IDataChannel
+ ) {
+ consumer = truncateOnErrorDataChannel(consumer);
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (this.consumers.get(bundleId)!.has(transformId)) {
Review Comment:
This is a two-level mapping; the first `if` makes sure the first level
exists and the second checks the second level.
##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+ start: () => Promise<string>;
+ stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+ constructor(public address: string) {
+ this.address = address;
+ }
+ async start() {
+ return this.address;
+ }
+ async stop() {}
+}
+
+export class SubprocessService {
+ process: ChildProcess;
+ cmd: string;
+ args: string[];
+
+ constructor(cmd: string, args: string[]) {
+ this.cmd = cmd;
+ this.args = args;
+ }
+
+ async start() {
+ // TODO: (Cleanup) Choose a free port.
+ const host = "localhost";
+ const port = "7778";
+ console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+ this.process = childProcess.spawn(
Review Comment:
As I read the documentation, spanSync waits until the process has completed,
which is not what we want here.
##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnDataClient,
+ IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+ dataClient: BeamFnDataClient;
+ dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+ consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+ constructor(endpoint: string, workerId: string) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", workerId);
+ this.dataClient = new BeamFnDataClient(
+ endpoint,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.dataChannel = this.dataClient.data(metadata);
+ this.dataChannel.on("data", async (elements) => {
+ console.log("data", elements);
+ for (const data of elements.data) {
+ const consumer = this.getConsumer(data.instructionId,
data.transformId);
+ try {
+ await consumer.sendData(data.data);
+ if (data.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ for (const timers of elements.timers) {
+ const consumer = this.getConsumer(
+ timers.instructionId,
+ timers.transformId
+ );
+ try {
+ await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+ if (timers.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ });
+ }
+
+ close() {
+ this.dataChannel.end();
+ }
+
+ async registerConsumer(
+ bundleId: string,
+ transformId: string,
+ consumer: IDataChannel
+ ) {
+ consumer = truncateOnErrorDataChannel(consumer);
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (this.consumers.get(bundleId)!.has(transformId)) {
+ await (
+ this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+ ).flush(consumer);
+ }
+ this.consumers.get(bundleId)!.set(transformId, consumer);
+ }
+
+ unregisterConsumer(bundleId: string, transformId: string) {
+ this.consumers.get(bundleId)!.delete(transformId);
+ }
+
+ getConsumer(bundleId: string, transformId: string): IDataChannel {
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (!this.consumers.get(bundleId)!.has(transformId)) {
Review Comment:
Same.
##########
sdks/typescript/src/apache_beam/transforms/window.ts:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as runnerApi from "../proto/beam_runner_api";
+import * as urns from "../internal/urns";
+
+import { PTransform } from "./transform";
+import { Coder } from "../coders/coders";
+import { Window } from "../values";
+import { PCollection } from "../pvalue";
+import { Pipeline } from "../internal/pipeline";
+import { ParDo } from "./pardo";
+import { serializeFn } from "../internal/serialize";
+
+export interface WindowFn<W extends Window> {
+ assignWindows: (Instant) => W[];
+ windowCoder: () => Coder<W>;
+ toProto: () => runnerApi.FunctionSpec;
+ isMerging: () => boolean;
+ assignsToOneWindow: () => boolean;
+}
+
+export class WindowInto<T, W extends Window> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ static createWindowingStrategy(
+ pipeline: Pipeline,
+ windowFn: WindowFn<any>,
+ windowingStrategyBase: runnerApi.WindowingStrategy | undefined = undefined
+ ): runnerApi.WindowingStrategy {
+ let result: runnerApi.WindowingStrategy;
+ if (windowingStrategyBase == undefined) {
+ result = {
+ windowFn: undefined!,
+ windowCoderId: undefined!,
+ mergeStatus: undefined!,
+ assignsToOneWindow: undefined!,
+ trigger: { trigger: { oneofKind: "default", default: {} } },
+ accumulationMode: runnerApi.AccumulationMode_Enum.DISCARDING,
+ outputTime: runnerApi.OutputTime_Enum.END_OF_WINDOW,
+ closingBehavior: runnerApi.ClosingBehavior_Enum.EMIT_ALWAYS,
+ onTimeBehavior: runnerApi.OnTimeBehavior_Enum.FIRE_ALWAYS,
+ allowedLateness: BigInt(0),
+ environmentId: pipeline.defaultEnvironment,
+ };
+ } else {
+ result = runnerApi.WindowingStrategy.clone(windowingStrategyBase);
+ }
+ result.windowFn = windowFn.toProto();
+ result.windowCoderId = pipeline.context.getCoderId(windowFn.windowCoder());
+ result.mergeStatus = windowFn.isMerging()
+ ? runnerApi.MergeStatus_Enum.NEEDS_MERGE
+ : runnerApi.MergeStatus_Enum.NON_MERGING;
+ result.assignsToOneWindow = windowFn.assignsToOneWindow();
+ return result;
+ }
+
+ constructor(
+ private windowFn: WindowFn<W>,
+ private windowingStrategyBase:
+ | runnerApi.WindowingStrategy
+ | undefined = undefined
+ ) {
+ super("WindowInto(" + windowFn + ", " + windowingStrategyBase + ")");
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_WINDOW_INTO_DOFN_URN,
+ payload: serializeFn({ windowFn: this.windowFn }),
+ }),
+ })
+ ),
+ });
+
+ const inputCoder = pipeline.context.getPCollectionCoderId(input);
+ return pipeline.createPCollectionInternal<T>(
+ inputCoder,
+ WindowInto.createWindowingStrategy(
+ pipeline,
+ this.windowFn,
+ this.windowingStrategyBase
+ )
+ );
+ }
+}
+
+// TODO: (Cleanup) Add restrictions on moving backwards?
+export class AssignTimestamps<T> extends PTransform<
+ PCollection<T>,
+ PCollection<T>
+> {
+ constructor(private func: (T, Instant) => typeof Instant) {
+ super();
+ }
+
+ expandInternal(
+ input: PCollection<T>,
+ pipeline: Pipeline,
+ transformProto: runnerApi.PTransform
+ ) {
+ transformProto.spec = runnerApi.FunctionSpec.create({
+ urn: ParDo.urn,
+ payload: runnerApi.ParDoPayload.toBinary(
+ runnerApi.ParDoPayload.create({
+ doFn: runnerApi.FunctionSpec.create({
+ urn: urns.JS_ASSIGN_TIMESTAMPS_DOFN_URN,
+ payload: serializeFn({ func: this.func }),
+ }),
+ })
+ ),
+ });
+
Review Comment:
The payloads are different as well. I'm going to leave this as is, as
there's also value in making this quasi-object-literal very transparent.
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+ ParamProviderImpl,
+ SideInputInfo,
+ createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+ promises: Promise<void>[] = [];
+ add(result: ProcessResult) {
+ if (result != NonPromise) {
+ this.promises.push(result as Promise<void>);
+ }
+ }
+ build(): ProcessResult {
+ if (this.promises.length == 0) {
+ return NonPromise;
+ } else if (this.promises.length == 1) {
+ return this.promises[0];
+ } else {
+ return Promise.all(this.promises).then(() => void null);
+ }
+ }
+}
+
+export interface IOperator {
+ startBundle: () => Promise<void>;
+ // As this is called at every operator at every element, and the vast
majority
+ // of the time Promises are not needed, we wish to avoid the overhead of
+ // creating promisses and await as much as possible.
+ process: (wv: WindowedValue<unknown>) => ProcessResult;
+ finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+ constructor(private operators: IOperator[]) {}
+
+ receive(wvalue: WindowedValue<unknown>): ProcessResult {
+ if (this.operators.length == 1) {
+ return this.operators[0].process(wvalue);
+ } else {
+ const result = new ProcessResultBuilder();
+ for (const operator of this.operators) {
+ result.add(operator.process(wvalue));
+ }
+ return result.build();
+ }
+ }
+}
+
+export class OperatorContext {
+ pipelineContext: PipelineContext;
+ constructor(
+ public descriptor: ProcessBundleDescriptor,
+ public getReceiver: (string) => Receiver,
+ public getDataChannel: (string) => MultiplexingDataChannel,
+ public getStateProvider: () => StateProvider,
+ public getBundleId: () => string
+ ) {
+ this.pipelineContext = new PipelineContext(descriptor);
+ }
+}
+
+export function createOperator(
+ transformId: string,
+ context: OperatorContext
+): IOperator {
+ const transform = context.descriptor.transforms[transformId];
+ // Ensure receivers are eagerly created.
+ Object.values(transform.outputs).map(context.getReceiver);
+ let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+ if (operatorConstructor == undefined) {
+ throw new Error("Unknown transform type:" + transform.spec?.urn);
+ }
+ return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+ new (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+ ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+ registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+ return new cls(transformId, transformProto, context);
+ });
+}
+
+export function registerOperatorConstructor(
+ urn: string,
+ constructor: OperatorConstructor
+) {
+ operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+ transformId: string;
+ getBundleId: () => string;
+ multiplexingDataChannel: MultiplexingDataChannel;
+ receiver: Receiver;
+ coder: Coder<WindowedValue<unknown>>;
+ endOfData: Promise<void>;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+ this.multiplexingDataChannel = context.getDataChannel(
+ readPort.apiServiceDescriptor!.url
+ );
+ this.transformId = transformId;
+ this.getBundleId = context.getBundleId;
+ this.receiver = context.getReceiver(
+ onlyElement(Object.values(transform.outputs))
+ );
+ this.coder = context.pipelineContext.getCoder(readPort.coderId);
+ }
+
+ async startBundle() {
+ const this_ = this;
+ var endOfDataResolve, endOfDataReject;
+ this.endOfData = new Promise(async (resolve, reject) => {
+ endOfDataResolve = resolve;
+ endOfDataReject = reject;
+ });
+
+ await this_.multiplexingDataChannel.registerConsumer(
+ this_.getBundleId(),
+ this_.transformId,
+ {
+ sendData: async function (data: Uint8Array) {
+ console.log("Got", data);
+ const reader = new protobufjs.Reader(data);
+ while (reader.pos < reader.len) {
+ const maybePromise = this_.receiver.receive(
+ this_.coder.decode(reader, CoderContext.needsDelimiters)
+ );
+ if (maybePromise != NonPromise) {
+ await maybePromise;
+ }
+ }
+ },
+ sendTimers: async function (timerFamilyId: string, timers: Uint8Array)
{
+ throw Error("Not expecting timers.");
+ },
+ close: function () {
+ endOfDataResolve();
+ },
+ onError: function (error: Error) {
+ endOfDataReject(error);
+ },
+ }
+ );
+ }
+
+ process(wvalue: WindowedValue<unknown>): ProcessResult {
+ throw Error("Data should not come in via process.");
+ }
+
+ async finishBundle() {
+ try {
+ await this.endOfData;
+ } finally {
+ this.multiplexingDataChannel.unregisterConsumer(
+ this.getBundleId(),
+ this.transformId
+ );
+ }
+ }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+ transformId: string;
+ getBundleId: () => string;
+ multiplexingDataChannel: MultiplexingDataChannel;
+ channel: IDataChannel;
+ coder: Coder<WindowedValue<unknown>>;
+ buffer: protobufjs.Writer;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+ this.multiplexingDataChannel = context.getDataChannel(
+ writePort.apiServiceDescriptor!.url
+ );
+ this.transformId = transformId;
+ this.getBundleId = context.getBundleId;
+ this.coder = context.pipelineContext.getCoder(writePort.coderId);
+ }
+
+ async startBundle() {
+ this.channel = this.multiplexingDataChannel.getSendChannel(
+ this.getBundleId(),
+ this.transformId
+ );
+ this.buffer = new protobufjs.Writer();
+ }
+
+ process(wvalue: WindowedValue<unknown>) {
+ this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+ if (this.buffer.len > 1e6) {
+ return this.flush();
+ }
+ return NonPromise;
+ }
+
+ async finishBundle() {
+ await this.flush();
+ this.channel.close();
+ }
+
+ async flush() {
+ if (this.buffer.len > 0) {
+ await this.channel.sendData(this.buffer.finish());
+ this.buffer = new protobufjs.Writer();
+ }
+ }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+ receiver: Receiver;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ this.receiver = context.getReceiver(
+ onlyElement(Object.values(transform.outputs))
+ );
+ }
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ return this.receiver.receive(wvalue);
+ }
+
+ async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+ private doFn: DoFn<unknown, unknown, unknown>;
+ private getStateProvider: () => StateProvider;
+ private sideInputInfo: Map<string, SideInputInfo> = new Map();
+ private originalContext: object | undefined;
+ private augmentedContext: object | undefined;
+ private paramProvider: ParamProviderImpl;
+
+ constructor(
+ private transformId: string,
+ private receiver: Receiver,
+ private spec: runnerApi.ParDoPayload,
+ private payload: {
+ doFn: DoFn<unknown, unknown, unknown>;
+ context: any;
+ },
+ transformProto: runnerApi.PTransform,
+ operatorContext: OperatorContext
+ ) {
+ this.doFn = payload.doFn;
+ this.originalContext = payload.context;
+ this.getStateProvider = operatorContext.getStateProvider;
+ this.sideInputInfo = createSideInputInfo(
+ transformProto,
+ spec,
+ operatorContext
+ );
+ }
+
+ async startBundle() {
+ this.paramProvider = new ParamProviderImpl(
+ this.transformId,
+ this.sideInputInfo,
+ this.getStateProvider
+ );
+ this.augmentedContext = this.paramProvider.augmentContext(
+ this.originalContext
+ );
+ if (this.doFn.startBundle) {
+ this.doFn.startBundle(this.augmentedContext);
+ }
+ }
+
+ process(wvalue: WindowedValue<unknown>) {
+ if (this.augmentedContext && wvalue.windows.length != 1) {
+ // We need to process each window separately.
+ // TODO: (Perf) We could inspect the context more deeply and allow some
+ // cases to go through.
+ const result = new ProcessResultBuilder();
+ for (const window of wvalue.windows) {
+ result.add(
+ this.process({
+ value: wvalue.value,
+ windows: [window],
+ pane: wvalue.pane,
+ timestamp: wvalue.timestamp,
+ })
+ );
+ }
+ return result.build();
+ }
+
+ const this_ = this;
+ function reallyProcess(): ProcessResult {
+ const doFnOutput = this_.doFn.process(
+ wvalue.value,
+ this_.augmentedContext
+ );
+ if (!doFnOutput) {
+ return NonPromise;
+ }
+ const result = new ProcessResultBuilder();
+ for (const element of doFnOutput) {
+ result.add(
+ this_.receiver.receive({
+ value: element,
+ windows: wvalue.windows,
+ pane: wvalue.pane,
+ timestamp: wvalue.timestamp,
+ })
+ );
+ }
+ this_.paramProvider.update(undefined);
+ return result.build();
+ }
+
+ // Update the context with any information specific to this window.
+ const updateContextResult = this.paramProvider.update(wvalue);
+
+ // If we were able to do so without any deferred actions, process the
+ // element immediately.
+ if (updateContextResult == NonPromise) {
+ return reallyProcess();
+ } else {
+ // Otherwise return a promise that first waits for all the deferred
+ // actions to complete and then process the element.
+ return (async () => {
+ await updateContextResult;
+ const update2 = this.paramProvider.update(wvalue);
+ if (update2 != NonPromise) {
+ throw new Error("Expected all promises to be resolved: " + update2);
+ }
+ await reallyProcess();
+ })();
+ }
+ }
+
+ async finishBundle() {
+ if (this.doFn.finishBundle) {
+ const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+ if (!finishBundleOutput) {
+ return;
+ }
+ // The finishBundle method must return `void` or a
Generator<WindowedValue<OutputT>>. It may not
+ // return Generator<OutputT> without windowing information because a
single bundle may contain
+ // elements from different windows, so each element must specify its
window.
+ for (const element of finishBundleOutput) {
+ const maybePromise = this.receiver.receive(element);
+ if (maybePromise != NonPromise) {
+ await maybePromise;
+ }
+ }
+ }
+ }
+}
+
+class IdentityParDoOperator implements IOperator {
+ constructor(private receiver: Receiver) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ return this.receiver.receive(wvalue);
+ }
+
+ async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+ constructor(
+ private splitter: (any) => string,
+ private receivers: { [key: string]: Receiver }
+ ) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const tag = this.splitter(wvalue.value);
+ const receiver = this.receivers[tag];
+ if (receiver) {
+ return receiver.receive(wvalue);
+ } else {
+ // TODO: (API) Make this configurable.
+ throw new Error(
+ "Unexpected tag '" +
+ tag +
+ "' for " +
+ wvalue.value +
+ " not in " +
+ [...Object.keys(this.receivers)]
+ );
+ }
+ }
+
+ async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+ constructor(private receivers: { [key: string]: Receiver }) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const result = new ProcessResultBuilder();
+ // TODO: (API) Should I exactly one instead of allowing a union?
+ for (const tag of Object.keys(wvalue.value as object)) {
+ const receiver = this.receivers[tag];
+ if (receiver) {
+ result.add(
+ receiver.receive({
+ value: (wvalue.value as object)[tag],
+ windows: wvalue.windows,
+ timestamp: wvalue.timestamp,
+ pane: wvalue.pane,
+ })
+ );
+ } else {
+ // TODO: (API) Make this configurable.
+ throw new Error(
+ "Unexpected tag '" +
+ tag +
+ "' for " +
+ wvalue.value +
+ " not in " +
+ [...Object.keys(this.receivers)]
+ );
+ }
+ }
+ return result.build();
+ }
+
+ async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+ constructor(private receiver: Receiver, private windowFn: WindowFn<Window>)
{}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+ if (newWindowsOnce.length > 0) {
+ const newWindows: Window[] = [];
+ for (var i = 0; i < wvalue.windows.length; i++) {
+ newWindows.push(...newWindowsOnce);
Review Comment:
That is correct. Added a comment to clarify.
##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+ start: () => Promise<string>;
+ stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+ constructor(public address: string) {
+ this.address = address;
+ }
+ async start() {
+ return this.address;
+ }
+ async stop() {}
+}
+
+export class SubprocessService {
+ process: ChildProcess;
+ cmd: string;
+ args: string[];
+
+ constructor(cmd: string, args: string[]) {
+ this.cmd = cmd;
+ this.args = args;
+ }
+
+ async start() {
+ // TODO: (Cleanup) Choose a free port.
+ const host = "localhost";
+ const port = "7778";
+ console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+ this.process = childProcess.spawn(
+ this.cmd,
+ this.args.map((arg) => arg.replace("{{PORT}}", port)),
+ {
+ stdio: "inherit",
+ }
+ );
+
+ try {
+ await this.portReady(port, host, 10000);
+ } catch (error) {
+ this.process.kill();
+ throw error;
+ }
+
+ return host + ":" + port;
+ }
+
+ async stop() {
+ this.process.kill();
+ }
+
+ async portReady(port, host, timeoutMs, iterMs = 100) {
+ const start = Date.now();
+ let connected = false;
+ while (!connected && Date.now() - start < timeoutMs) {
+ if (this.process.exitCode) {
+ throw new Error("Aborted with error " + this.process.exitCode);
+ }
+ await new Promise((r) => setTimeout(r, iterMs));
+ try {
+ await new Promise<void>((resolve, reject) => {
+ const socket = net.createConnection(port, host, () => {
+ resolve();
+ socket.end();
+ connected = true;
+ });
+ socket.on("error", (err) => {
+ reject(err);
+ });
+ });
+ } catch (err) {
+ // go around again
+ }
+ }
+ if (!connected) {
+ this.process.kill();
+ throw new Error(
+ "Timed out waiting for service after " + timeoutMs + "ms."
+ );
+ }
+ }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+ gradleTarget: string,
+ args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+ return async () => {
+ return new JavaJarService(
+ await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+ args
+ );
+ };
+}
+
+export class JavaJarService extends SubprocessService {
+ static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+ static BEAM_GROUP_ID = "org.apache.beam";
+ static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+ constructor(jar: string, args: string[] | undefined = undefined) {
+ if (args == undefined) {
+ // TODO: (Extension) Should filesToStage be set at some higher level?
+ args = ["{{PORT}}", "--filesToStage=" + jar];
+ }
+ super("java", ["-jar", jar].concat(args));
+ }
+
+ static async cachedJar(
+ urlOrPath: string,
+ cacheDir: string = JavaJarService.JAR_CACHE
+ ): Promise<string> {
+ if (urlOrPath.match(/^https?:\/\//)) {
+ fs.mkdirSync(cacheDir, { recursive: true });
+ const dest = path.join(
+ JavaJarService.JAR_CACHE,
+ path.basename(urlOrPath)
+ );
+ if (fs.existsSync(dest)) {
+ return dest;
+ }
+ // TODO: (Cleanup) Use true temporary file.
+ const tmp = dest + ".tmp" + Math.random();
+ return new Promise((resolve, reject) => {
+ const fout = fs.createWriteStream(tmp);
+ console.log("Downloading", urlOrPath);
+ const request = https.get(urlOrPath, function (response) {
+ if (response.statusCode !== 200) {
+ reject(
+ `Error code ${response.statusCode} when downloading ${urlOrPath}`
+ );
+ }
+ response.pipe(fout);
+ fout.on("finish", function () {
+ fout.close(() => {
+ fs.renameSync(tmp, dest);
+ resolve(dest);
+ });
+ });
+ });
+ });
+ } else {
+ return urlOrPath;
+ }
+ }
+
+ static gradleToJar(
+ gradleTarget: string,
+ appendix: string | undefined = undefined,
+ version: string = beamVersion
+ ): string {
+ if (version.startsWith("0.")) {
+ // node-ts 0.x corresponds to Beam 2.x.
+ version = "2" + version.substring(1);
+ }
+ version = "2.36.0";
Review Comment:
Yep. Leftover from debugging. Removed.
##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+// npx tsc && npm run worker
+// From sdks/python
+// python trivial_pipeline.py --environment_type=EXTERNAL
--environment_config='localhost:5555' --runner=PortableRunner
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnControlClient,
+ IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+ beamFnExternalWorkerPoolDefinition,
+ IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+ MultiplexingStateChannel,
+ CachingStateProvider,
+ GrpcStateProvider,
+ StateProvider,
+} from "./state";
+import {
+ IOperator,
+ Receiver,
+ createOperator,
+ OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+ controlUrl: string;
+}
+
+export class Worker {
+ controlClient: BeamFnControlClient;
+ controlChannel: grpc.ClientDuplexStream<
+ InstructionResponse,
+ InstructionRequest
+ >;
+
+ processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+ bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+ dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+ stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+ constructor(
+ private id: string,
+ private endpoints: WorkerEndpoints,
+ options: Object = {}
+ ) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", this.id);
+ this.controlClient = new BeamFnControlClient(
+ endpoints.controlUrl,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.controlChannel = this.controlClient.control(metadata);
+ this.controlChannel.on("data", async (request) => {
+ console.log(request);
+ if (request.request.oneofKind == "processBundle") {
+ await this.process(request);
+ } else {
+ console.log("Unknown instruction type: ", request);
Review Comment:
Good call on both fronts. Done.
##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+// npx tsc && npm run worker
+// From sdks/python
+// python trivial_pipeline.py --environment_type=EXTERNAL
--environment_config='localhost:5555' --runner=PortableRunner
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnControlClient,
+ IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+ beamFnExternalWorkerPoolDefinition,
+ IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+ MultiplexingStateChannel,
+ CachingStateProvider,
+ GrpcStateProvider,
+ StateProvider,
+} from "./state";
+import {
+ IOperator,
+ Receiver,
+ createOperator,
+ OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+ controlUrl: string;
+}
+
+export class Worker {
+ controlClient: BeamFnControlClient;
+ controlChannel: grpc.ClientDuplexStream<
+ InstructionResponse,
+ InstructionRequest
+ >;
+
+ processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+ bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+ dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+ stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+ constructor(
+ private id: string,
+ private endpoints: WorkerEndpoints,
+ options: Object = {}
+ ) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", this.id);
+ this.controlClient = new BeamFnControlClient(
+ endpoints.controlUrl,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.controlChannel = this.controlClient.control(metadata);
+ this.controlChannel.on("data", async (request) => {
+ console.log(request);
+ if (request.request.oneofKind == "processBundle") {
+ await this.process(request);
+ } else {
+ console.log("Unknown instruction type: ", request);
+ }
+ });
+ this.controlChannel.on("end", () => {
+ console.log("Control channel closed.");
+ for (const dataChannel of this.dataChannels.values()) {
+ dataChannel.close();
+ }
+ for (const stateChannel of this.stateChannels.values()) {
+ stateChannel.close();
+ }
+ });
+ }
+
+ async wait() {
+ // TODO: Await closing of control log.
+ await new Promise((r) => setTimeout(r, 1e9));
+ }
+
+ respond(response: InstructionResponse) {
+ this.controlChannel.write(response);
+ }
+
+ async process(request) {
+ const descriptorId =
+ request.request.processBundle.processBundleDescriptorId;
+ console.log("process", request.instructionId, descriptorId);
+ try {
+ if (!this.processBundleDescriptors.has(descriptorId)) {
+ const call = this.controlClient.getProcessBundleDescriptor(
+ {
+ processBundleDescriptorId: descriptorId,
+ },
+ (err, value: ProcessBundleDescriptor) => {
+ if (err) {
+ this.respond({
+ instructionId: request.instructionId,
+ error: "" + err,
+ response: {
+ oneofKind: "processBundle",
+ processBundle: {
+ residualRoots: [],
+ monitoringInfos: [],
+ requiresFinalization: false,
+ monitoringData: {},
+ },
+ },
+ });
+ } else {
+ this.processBundleDescriptors.set(descriptorId, value);
+ this.process(request);
+ }
+ }
+ );
+ return;
+ }
+
+ const processor = this.aquireBundleProcessor(descriptorId);
+ await processor.process(request.instructionId);
+ await this.respond({
+ instructionId: request.instructionId,
+ error: "",
+ response: {
+ oneofKind: "processBundle",
+ processBundle: {
+ residualRoots: [],
+ monitoringInfos: [],
+ requiresFinalization: false,
+ monitoringData: {},
+ },
+ },
+ });
+ this.returnBundleProcessor(processor);
+ } catch (error) {
+ console.error("PROCESS ERROR", error);
+ await this.respond({
+ instructionId: request.instructionId,
+ error: "" + error,
+ response: { oneofKind: undefined },
+ });
+ }
+ }
+
+ aquireBundleProcessor(descriptorId: string) {
+ if (!this.bundleProcessors.has(descriptorId)) {
+ this.bundleProcessors.set(descriptorId, []);
+ }
+ const processor = this.bundleProcessors.get(descriptorId)?.pop();
+ if (processor != undefined) {
+ return processor;
+ } else {
+ return new BundleProcessor(
+ this.processBundleDescriptors.get(descriptorId)!,
+ this.getDataChannel.bind(this),
+ this.getStateChannel.bind(this)
+ );
+ }
+ }
+
+ returnBundleProcessor(processor: BundleProcessor) {
+ this.bundleProcessors.get(processor.descriptor.id)?.push(processor);
Review Comment:
Yes. Done.
##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+ start: () => Promise<string>;
+ stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+ constructor(public address: string) {
+ this.address = address;
+ }
+ async start() {
+ return this.address;
+ }
+ async stop() {}
+}
+
+export class SubprocessService {
+ process: ChildProcess;
+ cmd: string;
+ args: string[];
+
+ constructor(cmd: string, args: string[]) {
+ this.cmd = cmd;
+ this.args = args;
+ }
+
+ async start() {
+ // TODO: (Cleanup) Choose a free port.
+ const host = "localhost";
+ const port = "7778";
+ console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+ this.process = childProcess.spawn(
+ this.cmd,
+ this.args.map((arg) => arg.replace("{{PORT}}", port)),
+ {
+ stdio: "inherit",
+ }
+ );
+
+ try {
+ await this.portReady(port, host, 10000);
+ } catch (error) {
+ this.process.kill();
+ throw error;
+ }
+
+ return host + ":" + port;
+ }
+
+ async stop() {
+ this.process.kill();
+ }
+
+ async portReady(port, host, timeoutMs, iterMs = 100) {
+ const start = Date.now();
+ let connected = false;
+ while (!connected && Date.now() - start < timeoutMs) {
+ if (this.process.exitCode) {
+ throw new Error("Aborted with error " + this.process.exitCode);
+ }
+ await new Promise((r) => setTimeout(r, iterMs));
+ try {
+ await new Promise<void>((resolve, reject) => {
+ const socket = net.createConnection(port, host, () => {
+ resolve();
+ socket.end();
+ connected = true;
+ });
+ socket.on("error", (err) => {
+ reject(err);
+ });
+ });
+ } catch (err) {
+ // go around again
+ }
+ }
+ if (!connected) {
+ this.process.kill();
+ throw new Error(
+ "Timed out waiting for service after " + timeoutMs + "ms."
+ );
+ }
+ }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+ gradleTarget: string,
+ args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+ return async () => {
+ return new JavaJarService(
+ await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+ args
+ );
+ };
+}
+
+export class JavaJarService extends SubprocessService {
+ static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";
+ static BEAM_GROUP_ID = "org.apache.beam";
+ static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+ constructor(jar: string, args: string[] | undefined = undefined) {
+ if (args == undefined) {
+ // TODO: (Extension) Should filesToStage be set at some higher level?
+ args = ["{{PORT}}", "--filesToStage=" + jar];
+ }
+ super("java", ["-jar", jar].concat(args));
+ }
+
+ static async cachedJar(
+ urlOrPath: string,
+ cacheDir: string = JavaJarService.JAR_CACHE
+ ): Promise<string> {
+ if (urlOrPath.match(/^https?:\/\//)) {
+ fs.mkdirSync(cacheDir, { recursive: true });
+ const dest = path.join(
+ JavaJarService.JAR_CACHE,
+ path.basename(urlOrPath)
+ );
+ if (fs.existsSync(dest)) {
+ return dest;
+ }
+ // TODO: (Cleanup) Use true temporary file.
+ const tmp = dest + ".tmp" + Math.random();
+ return new Promise((resolve, reject) => {
+ const fout = fs.createWriteStream(tmp);
+ console.log("Downloading", urlOrPath);
+ const request = https.get(urlOrPath, function (response) {
+ if (response.statusCode !== 200) {
+ reject(
+ `Error code ${response.statusCode} when downloading ${urlOrPath}`
+ );
+ }
+ response.pipe(fout);
+ fout.on("finish", function () {
+ fout.close(() => {
+ fs.renameSync(tmp, dest);
+ resolve(dest);
+ });
+ });
+ });
+ });
+ } else {
+ return urlOrPath;
+ }
+ }
+
+ static gradleToJar(
+ gradleTarget: string,
+ appendix: string | undefined = undefined,
+ version: string = beamVersion
+ ): string {
+ if (version.startsWith("0.")) {
+ // node-ts 0.x corresponds to Beam 2.x.
+ version = "2" + version.substring(1);
+ }
+ version = "2.36.0";
+ const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+ const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+ const projectRoot = path.resolve(
+ __dirname,
+ "..",
+ "..",
+ "..",
+ "..",
+ "..",
+ ".."
+ );
Review Comment:
I agree. For now I added a TODO.
##########
sdks/typescript/boot.go:
##########
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "flag"
+ "log"
+ "os"
+ "strings"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
+)
+
+var (
+ // Contract: https://s.apache.org/beam-fn-api-container-contract.
+
+ id = flag.String("id", "", "Local identifier
(required).")
+ loggingEndpoint = flag.String("logging_endpoint", "", "Local logging
endpoint for FnHarness (required).")
+ artifactEndpoint = flag.String("artifact_endpoint", "", "Local
artifact endpoint for FnHarness (required).")
+ provisionEndpoint = flag.String("provision_endpoint", "", "Local
provision endpoint for FnHarness (required).")
+ controlEndpoint = flag.String("control_endpoint", "", "Local control
endpoint for FnHarness (required).")
+ semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local
semi-persistent directory (optional).")
+)
+
+const entrypoint = "dist/worker/worker_main.js"
+
+func main() {
+ flag.Parse()
+ if *id == "" {
+ log.Fatal("No id provided.")
+ }
+ if *provisionEndpoint == "" {
+ log.Fatal("No provision endpoint provided.")
+ }
+
+ ctx := grpcx.WriteWorkerID(context.Background(), *id)
+
+ info, err := provision.Info(ctx, *provisionEndpoint)
+ if err != nil {
+ log.Fatalf("Failed to obtain provisioning information: %v", err)
+ }
+ log.Printf("Provision info:\n%v", info)
+
+ // TODO(BEAM-8201): Simplify once flags are no longer used.
Review Comment:
I'm not sure if all the cleanup has been done on the Dataflow side. @ihji
##########
sdks/typescript/README.md:
##########
@@ -0,0 +1,208 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Typescript Beam SDK
+
+This is the start of a fully functioning Javascript (actually, Typescript) SDK.
+There are two distinct aims with this SDK
+
+1. Tap into the large (and relatively underserved, by existing data processing
+frameworks) community of javascript developers with a native SDK targeting
this language.
+
+1. Develop a new SDK which can serve both as a proof of concept and reference
+that highlights the (relative) ease of porting Beam to new languages,
+a differentiating feature of Beam and Dataflow.
+
+To accomplish this, we lean heavily on the portability framework.
+For example, we make heavy use of cross-language transforms,
+in particular for IOs.
+In addition, the direct runner is simply an extension of the worker suitable
+for running on portable runners such as the ULR, which will directly transfer
+to running on production runners such as Dataflow and Flink.
+The target audience should hopefully not be put off by running other-language
+code encapsulated in docker images.
+
+## API
+
+We generally try to apply the concepts from the Beam API in a Typescript
+idiomatic way, but it should be noted that few of the initial developers
+have extensive (if any) Javascript/Typescript development experience, so
+feedback is greatly appreciated.
+
+In addition, some notable departures are taken from the traditional SDKs:
+
+* We take a "relational foundations" approach, where
+[schema'd
data](https://docs.google.com/document/d/1tnG2DPHZYbsomvihIpXruUmQ12pHGK0QIvXS1FOTgRc/edit#heading=h.puuotbien1gf)
+is the primary way to interact with data, and we generally eschew the key-value
+requiring transforms in favor of a more flexible approach naming fields or
+expressions. Javascript's native Object is used as the row type.
+
+* As part of being schema-first we also de-emphasize Coders as a first-class
+concept in the SDK, relegating it to an advance feature used for interop.
+Though we can infer schemas from individual elements, it is still TBD to
+figure out if/how we can leverage the type system and/or function introspection
+to regularly infer schemas at construction time. A fallback coder using BSON
+encoding is used when we don't have sufficient type information.
+
+* We have added additional methods to the PCollection object, notably `map`
+and `flatmap`, [rather than only allowing
apply](https://www.mail-archive.com/[email protected]/msg06035.html).
+In addition, `apply` can accept a function argument `(PColletion) => ...` as
+well as a PTransform subclass, which treats this callable as if it were a
+PTransform's expand.
+
+* In the other direction, we have eliminated the
+[problematic Pipeline object](https://s.apache.org/no-beam-pipeline)
+from the API, instead providing a `Root` PValue on which pipelines are built,
+and invoking run() on a Runner. We offer a less error-prone `Runner.run`
+which finishes only when the pipeline is completely finished as well as
+`Runner.runAsync` which returns a handle to the running pipeline.
+
+* Rather than introduce PCollectionTuple, PCollectionList, etc. we let PValue
+literally be an
+[array or object with PValue
values](https://github.com/robertwb/beam-javascript/blob/de4390dd767f046903ac23fead5db333290462db/sdks/node-ts/src/apache_beam/pvalue.ts#L116)
+which transforms can consume or produce.
+These are applied by wrapping them with the `P` operator, e.g.
+`P([pc1, pc2, pc3]).apply(new Flatten())`.
+
+* Like Python, `flatMap` and `ParDo.process` return multiple elements by
+yielding them from a generator, rather than invoking a passed-in callback.
+TBD how to output to multiple distinct PCollections.
+There is currently an operation to split a PCollection into multiple
+PCollections based on the properties of the elements, and
+we may consider using a callback for side outputs.
+
+* The `map`, `flatmap`, and `ParDo.proceess` methods take an additional
+(optional) context argument, which is similar to the keyword arguments
+used in Python. These can be "ordinary" javascript objects (which are passed
+as is) or special DoFnParam objects which provide getters to element-specific
+information (such as the current timestamp, window, or side input) at runtime.
+
+* Javascript supports (and encourages) an asynchronous programing model, with
+many libraries requiring use of the async/await paradigm.
+As there is no way (by design) to go from the asyncronous style back to
+the synchronous style, this needs to be taken into account
+when designing the API.
+We currently offer asynchronous variants of `PValue.apply(...)` (in addition
+to the synchronous ones, as they are easier to chain) as well as making
+`Runner.run` asynchronous. TBD to do this for all user callbacks as well.
+
+An example pipeline can be found at
https://github.com/robertwb/beam-javascript/blob/javascript/sdks/node-ts/src/apache_beam/examples/wordcount.ts
+
+## TODO
+
+This SDK is a work in progress. In January 2022 we developed the ability to
+construct and run basic pipelines (including external transforms and running
+on a portable runner) but the following big-ticket items remain.
+
+* Containerization
+
+ * Function and object serialization: we currently only support "loopback"
+ mode; to be able to run on a remote, distributed manner we need to finish up
+ the work in picking closures and DoFn objects. Some investigation has been
+ started here, but all existing libraries have non-trivial drawbacks.
+
+ * Finish the work in building a full SDK container image that starts
+ the worker.
+
+ * Actually use worker threads for multiple bundles.
+
+* API
+
+ * There are several TODOs of minor features or design decisions to finalize.
+
+ * Consider using (or supporting) 2-arrays rather than {key, value} objects
+ for KVs.
+
+ * Consider renaming map/flatMap to doMap/doFlatMap to avoid confusion with
+ Array.map that takes a key as a second callback argument.
+ Or force the second argument to be an Object, which would lead to a less
+ confusing API and clean up the implementation.
+ Also add a [do]Filter, and possibly a [do]Reduce?
+
+ * Move away from using classes.
+
+ * Add the ability to set good PTransform names, and ideally infer good
+ defaults.
+
+ * Advanced features like metrics, state, timers, and SDF.
+ Possibly some of these can wait.
+
+* Infrastructure
+
+ * Gradle and Jenkins integration for tests and style enforcement.
+
+* Other
+
+ * Enforce unique names for pipeline update.
+
+ * PipelineOptions should be a Javascript Object, not a proto Struct.
+
+ * Though Dataflow Runner v2 supports portability, submission is still done
+ via v1beta3 and interaction with GCS rather than the job submission API.
+
+ * Cleanup uses of var, this. Arrow functions. `===` vs `==`.
+
+ * Avoid `any` return types (and re-enable check in compiler).
+
+ * Relative vs. absoute imports, possibly via setting a base url with a
+ `jsconfig.json`. Also remove imports from base.ts.
+
+ * More/better tests, including tests of illegal/unsupported use.
+
+ * Set channel options like `grpc.max_{send,receive}_message_length` as we
+ do in other SDKs.
+
+ * Reduce use of any.
+
+ * Could use `unknown` in its place where the type is truly unknown.
+
+ * It'd be nice to enforce, maybe re-enable `noImplicitAny: true` in
+ tsconfig if we can get the generated proto files to be ignored.
+
+ * Enable a linter like eslint and fix at least the low hanging fruit.
+
+There is probably more; there are many TODOs littered throughout the code.
+
+This code has also not yet been fully peer reviewed (it was the result of a
+hackathon) which needs to be done before putting it into the man repository.
+
+
+## Development.
Review Comment:
Yes. Excellent.
##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+ start: () => Promise<string>;
+ stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+ constructor(public address: string) {
+ this.address = address;
+ }
+ async start() {
+ return this.address;
+ }
+ async stop() {}
+}
+
+export class SubprocessService {
+ process: ChildProcess;
+ cmd: string;
+ args: string[];
+
+ constructor(cmd: string, args: string[]) {
+ this.cmd = cmd;
+ this.args = args;
+ }
+
+ async start() {
+ // TODO: (Cleanup) Choose a free port.
+ const host = "localhost";
+ const port = "7778";
+ console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+ this.process = childProcess.spawn(
+ this.cmd,
+ this.args.map((arg) => arg.replace("{{PORT}}", port)),
+ {
+ stdio: "inherit",
+ }
+ );
+
+ try {
+ await this.portReady(port, host, 10000);
+ } catch (error) {
+ this.process.kill();
+ throw error;
+ }
+
+ return host + ":" + port;
+ }
+
+ async stop() {
+ this.process.kill();
+ }
+
+ async portReady(port, host, timeoutMs, iterMs = 100) {
+ const start = Date.now();
+ let connected = false;
+ while (!connected && Date.now() - start < timeoutMs) {
+ if (this.process.exitCode) {
+ throw new Error("Aborted with error " + this.process.exitCode);
+ }
+ await new Promise((r) => setTimeout(r, iterMs));
+ try {
+ await new Promise<void>((resolve, reject) => {
+ const socket = net.createConnection(port, host, () => {
+ resolve();
+ socket.end();
+ connected = true;
+ });
+ socket.on("error", (err) => {
+ reject(err);
+ });
+ });
+ } catch (err) {
+ // go around again
+ }
+ }
+ if (!connected) {
+ this.process.kill();
Review Comment:
I agree. Done.
##########
sdks/typescript/src/apache_beam/transforms/combiners.ts:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { CombineFn } from "./group_and_combine";
+
+// TODO(cleanup): These reductions only work on Arrays, not Iterables.
+
+export const count: CombineFn<any, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc, i) => acc + 1,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc) => acc,
+};
+
+export const sum: CombineFn<number, number, number> = {
+ createAccumulator: () => 0,
+ addInput: (acc: number, i: number) => acc + i,
+ mergeAccumulators: (accumulators: number[]) =>
+ accumulators.reduce((prev, current) => prev + current),
+ extractOutput: (acc: number) => acc,
+};
+
+export const max: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc < i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a > b ? a : b)),
+ extractOutput: (acc: any) => acc,
+};
+
+export const min: CombineFn<any, any, any> = {
+ createAccumulator: () => undefined,
+ addInput: (acc: any, i: any) => (acc === undefined || acc > i ? i : acc),
+ mergeAccumulators: (accumulators: any[]) =>
+ accumulators.reduce((a, b) => (a < b ? a : b)),
Review Comment:
Done.
Issue Time Tracking
-------------------
Worklog Id: (was: 764392)
Time Spent: 5h 20m (was: 5h 10m)
> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
> Key: BEAM-1754
> URL: https://issues.apache.org/jira/browse/BEAM-1754
> Project: Beam
> Issue Type: New Feature
> Components: sdk-ideas
> Reporter: Diego Zuluaga
> Assignee: Kerry Donny-Clark
> Priority: P3
> Labels: node.js
> Time Spent: 5h 20m
> Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples
> highly comprehensible. However, I have to admit that for most Node.js
> developers who have little background on typed languages and are used to get
> up to speed with frameworks incredibly fast, learning Dataflow might take
> some learning curve that they/we're not used to. So, I wonder if at any point
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the
> question, but I wanted to run it by the team as it would be awesome to have
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python
--
This message was sent by Atlassian Jira
(v8.20.7#820007)