[
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=762323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762323
]
ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 13:53
Start Date: 26/Apr/22 13:53
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858663543
##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+// npx tsc && npm run worker
+// From sdks/python
+// python trivial_pipeline.py --environment_type=EXTERNAL
--environment_config='localhost:5555' --runner=PortableRunner
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnControlClient,
+ IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+ beamFnExternalWorkerPoolDefinition,
+ IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+ MultiplexingStateChannel,
+ CachingStateProvider,
+ GrpcStateProvider,
+ StateProvider,
+} from "./state";
+import {
+ IOperator,
+ Receiver,
+ createOperator,
+ OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+ controlUrl: string;
+}
+
+export class Worker {
+ controlClient: BeamFnControlClient;
+ controlChannel: grpc.ClientDuplexStream<
+ InstructionResponse,
+ InstructionRequest
+ >;
+
+ processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+ bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+ dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+ stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+ constructor(
+ private id: string,
+ private endpoints: WorkerEndpoints,
+ options: Object = {}
+ ) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", this.id);
+ this.controlClient = new BeamFnControlClient(
+ endpoints.controlUrl,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.controlChannel = this.controlClient.control(metadata);
+ this.controlChannel.on("data", async (request) => {
+ console.log(request);
+ if (request.request.oneofKind == "processBundle") {
+ await this.process(request);
+ } else {
+ console.log("Unknown instruction type: ", request);
Review Comment:
Instead of just logging, should we be returning an error response here?
Structurally, it probably will be cleaner to move this if/else into the
process function as well, that way as we add more options it doesn't end up
bloating this block.
##########
sdks/typescript/src/apache_beam/worker/worker.ts:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// From sdks/node-ts
+// npx tsc && npm run worker
+// From sdks/python
+// python trivial_pipeline.py --environment_type=EXTERNAL
--environment_config='localhost:5555' --runner=PortableRunner
--job_endpoint=embed
+
+import * as grpc from "@grpc/grpc-js";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+
+import { InstructionRequest, InstructionResponse } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnControlClient,
+ IBeamFnControlClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+import {
+ beamFnExternalWorkerPoolDefinition,
+ IBeamFnExternalWorkerPool,
+} from "../proto/beam_fn_api.grpc-server";
+
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import {
+ MultiplexingStateChannel,
+ CachingStateProvider,
+ GrpcStateProvider,
+ StateProvider,
+} from "./state";
+import {
+ IOperator,
+ Receiver,
+ createOperator,
+ OperatorContext,
+} from "./operators";
+
+export interface WorkerEndpoints {
+ controlUrl: string;
+}
+
+export class Worker {
+ controlClient: BeamFnControlClient;
+ controlChannel: grpc.ClientDuplexStream<
+ InstructionResponse,
+ InstructionRequest
+ >;
+
+ processBundleDescriptors: Map<string, ProcessBundleDescriptor> = new Map();
+ bundleProcessors: Map<string, BundleProcessor[]> = new Map();
+ dataChannels: Map<string, MultiplexingDataChannel> = new Map();
+ stateChannels: Map<string, MultiplexingStateChannel> = new Map();
+
+ constructor(
+ private id: string,
+ private endpoints: WorkerEndpoints,
+ options: Object = {}
+ ) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", this.id);
+ this.controlClient = new BeamFnControlClient(
+ endpoints.controlUrl,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.controlChannel = this.controlClient.control(metadata);
+ this.controlChannel.on("data", async (request) => {
+ console.log(request);
+ if (request.request.oneofKind == "processBundle") {
+ await this.process(request);
+ } else {
+ console.log("Unknown instruction type: ", request);
+ }
+ });
+ this.controlChannel.on("end", () => {
+ console.log("Control channel closed.");
+ for (const dataChannel of this.dataChannels.values()) {
+ dataChannel.close();
+ }
+ for (const stateChannel of this.stateChannels.values()) {
+ stateChannel.close();
+ }
+ });
+ }
+
+ async wait() {
+ // TODO: Await closing of control log.
+ await new Promise((r) => setTimeout(r, 1e9));
+ }
+
+ respond(response: InstructionResponse) {
+ this.controlChannel.write(response);
+ }
+
+ async process(request) {
+ const descriptorId =
+ request.request.processBundle.processBundleDescriptorId;
+ console.log("process", request.instructionId, descriptorId);
+ try {
+ if (!this.processBundleDescriptors.has(descriptorId)) {
+ const call = this.controlClient.getProcessBundleDescriptor(
+ {
+ processBundleDescriptorId: descriptorId,
+ },
+ (err, value: ProcessBundleDescriptor) => {
+ if (err) {
+ this.respond({
+ instructionId: request.instructionId,
+ error: "" + err,
+ response: {
+ oneofKind: "processBundle",
+ processBundle: {
+ residualRoots: [],
+ monitoringInfos: [],
+ requiresFinalization: false,
+ monitoringData: {},
+ },
+ },
+ });
+ } else {
+ this.processBundleDescriptors.set(descriptorId, value);
+ this.process(request);
+ }
+ }
+ );
+ return;
+ }
+
+ const processor = this.aquireBundleProcessor(descriptorId);
+ await processor.process(request.instructionId);
+ await this.respond({
+ instructionId: request.instructionId,
+ error: "",
+ response: {
+ oneofKind: "processBundle",
+ processBundle: {
+ residualRoots: [],
+ monitoringInfos: [],
+ requiresFinalization: false,
+ monitoringData: {},
+ },
+ },
+ });
+ this.returnBundleProcessor(processor);
+ } catch (error) {
+ console.error("PROCESS ERROR", error);
+ await this.respond({
+ instructionId: request.instructionId,
+ error: "" + error,
+ response: { oneofKind: undefined },
+ });
+ }
+ }
+
+ aquireBundleProcessor(descriptorId: string) {
+ if (!this.bundleProcessors.has(descriptorId)) {
+ this.bundleProcessors.set(descriptorId, []);
+ }
+ const processor = this.bundleProcessors.get(descriptorId)?.pop();
+ if (processor != undefined) {
+ return processor;
+ } else {
+ return new BundleProcessor(
+ this.processBundleDescriptors.get(descriptorId)!,
+ this.getDataChannel.bind(this),
+ this.getStateChannel.bind(this)
+ );
+ }
+ }
+
+ returnBundleProcessor(processor: BundleProcessor) {
+ this.bundleProcessors.get(processor.descriptor.id)?.push(processor);
Review Comment:
If this `this.bundleProcessors.get(processor.descriptor.id)?` returns
null/undefined, should we create it?
##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnDataClient,
+ IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+ dataClient: BeamFnDataClient;
+ dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+ consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+ constructor(endpoint: string, workerId: string) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", workerId);
+ this.dataClient = new BeamFnDataClient(
+ endpoint,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.dataChannel = this.dataClient.data(metadata);
+ this.dataChannel.on("data", async (elements) => {
+ console.log("data", elements);
+ for (const data of elements.data) {
+ const consumer = this.getConsumer(data.instructionId,
data.transformId);
+ try {
+ await consumer.sendData(data.data);
+ if (data.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ for (const timers of elements.timers) {
+ const consumer = this.getConsumer(
+ timers.instructionId,
+ timers.transformId
+ );
+ try {
+ await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+ if (timers.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ });
+ }
+
+ close() {
+ this.dataChannel.end();
+ }
+
+ async registerConsumer(
+ bundleId: string,
+ transformId: string,
+ consumer: IDataChannel
+ ) {
+ consumer = truncateOnErrorDataChannel(consumer);
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (this.consumers.get(bundleId)!.has(transformId)) {
Review Comment:
```suggestion
} else if (this.consumers.get(bundleId)!.has(transformId)) {
```
Nit: we can save a check here
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+ ParamProviderImpl,
+ SideInputInfo,
+ createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+ promises: Promise<void>[] = [];
+ add(result: ProcessResult) {
+ if (result != NonPromise) {
+ this.promises.push(result as Promise<void>);
+ }
+ }
+ build(): ProcessResult {
+ if (this.promises.length == 0) {
+ return NonPromise;
+ } else if (this.promises.length == 1) {
+ return this.promises[0];
+ } else {
+ return Promise.all(this.promises).then(() => void null);
+ }
+ }
+}
+
+export interface IOperator {
+ startBundle: () => Promise<void>;
+ // As this is called at every operator at every element, and the vast
majority
+ // of the time Promises are not needed, we wish to avoid the overhead of
+ // creating promisses and await as much as possible.
+ process: (wv: WindowedValue<unknown>) => ProcessResult;
+ finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+ constructor(private operators: IOperator[]) {}
+
+ receive(wvalue: WindowedValue<unknown>): ProcessResult {
+ if (this.operators.length == 1) {
+ return this.operators[0].process(wvalue);
+ } else {
+ const result = new ProcessResultBuilder();
+ for (const operator of this.operators) {
+ result.add(operator.process(wvalue));
+ }
+ return result.build();
+ }
+ }
+}
+
+export class OperatorContext {
+ pipelineContext: PipelineContext;
+ constructor(
+ public descriptor: ProcessBundleDescriptor,
+ public getReceiver: (string) => Receiver,
+ public getDataChannel: (string) => MultiplexingDataChannel,
+ public getStateProvider: () => StateProvider,
+ public getBundleId: () => string
+ ) {
+ this.pipelineContext = new PipelineContext(descriptor);
+ }
+}
+
+export function createOperator(
+ transformId: string,
+ context: OperatorContext
+): IOperator {
+ const transform = context.descriptor.transforms[transformId];
+ // Ensure receivers are eagerly created.
+ Object.values(transform.outputs).map(context.getReceiver);
+ let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+ if (operatorConstructor == undefined) {
+ throw new Error("Unknown transform type:" + transform.spec?.urn);
Review Comment:
```suggestion
throw new Error("Unknown transform type:" + transform.spec!.urn);
```
This will throw 2 lines earlier if spec is null/undefined
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+ ParamProviderImpl,
+ SideInputInfo,
+ createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+ promises: Promise<void>[] = [];
+ add(result: ProcessResult) {
+ if (result != NonPromise) {
+ this.promises.push(result as Promise<void>);
+ }
+ }
+ build(): ProcessResult {
+ if (this.promises.length == 0) {
+ return NonPromise;
+ } else if (this.promises.length == 1) {
+ return this.promises[0];
+ } else {
+ return Promise.all(this.promises).then(() => void null);
+ }
+ }
+}
+
+export interface IOperator {
+ startBundle: () => Promise<void>;
+ // As this is called at every operator at every element, and the vast
majority
+ // of the time Promises are not needed, we wish to avoid the overhead of
+ // creating promisses and await as much as possible.
+ process: (wv: WindowedValue<unknown>) => ProcessResult;
+ finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+ constructor(private operators: IOperator[]) {}
+
+ receive(wvalue: WindowedValue<unknown>): ProcessResult {
+ if (this.operators.length == 1) {
+ return this.operators[0].process(wvalue);
+ } else {
+ const result = new ProcessResultBuilder();
+ for (const operator of this.operators) {
+ result.add(operator.process(wvalue));
+ }
+ return result.build();
+ }
+ }
+}
+
+export class OperatorContext {
+ pipelineContext: PipelineContext;
+ constructor(
+ public descriptor: ProcessBundleDescriptor,
+ public getReceiver: (string) => Receiver,
+ public getDataChannel: (string) => MultiplexingDataChannel,
+ public getStateProvider: () => StateProvider,
+ public getBundleId: () => string
+ ) {
+ this.pipelineContext = new PipelineContext(descriptor);
+ }
+}
+
+export function createOperator(
+ transformId: string,
+ context: OperatorContext
+): IOperator {
+ const transform = context.descriptor.transforms[transformId];
+ // Ensure receivers are eagerly created.
+ Object.values(transform.outputs).map(context.getReceiver);
+ let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+ if (operatorConstructor == undefined) {
+ throw new Error("Unknown transform type:" + transform.spec?.urn);
+ }
+ return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+ new (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+ ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
+ registerOperatorConstructor(urn, (transformId, transformProto, context) => {
+ return new cls(transformId, transformProto, context);
+ });
+}
+
+export function registerOperatorConstructor(
+ urn: string,
+ constructor: OperatorConstructor
+) {
+ operatorsByUrn.set(urn, constructor);
+}
+
+////////// Actual operator implementation. //////////
+
+// NOTE: It may have been more idiomatic to use objects in closures satisfying
+// the IOperator interface here, but classes are used to make a clearer pattern
+// potential SDK authors that are less familiar with javascript.
+
+class DataSourceOperator implements IOperator {
+ transformId: string;
+ getBundleId: () => string;
+ multiplexingDataChannel: MultiplexingDataChannel;
+ receiver: Receiver;
+ coder: Coder<WindowedValue<unknown>>;
+ endOfData: Promise<void>;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ const readPort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+ this.multiplexingDataChannel = context.getDataChannel(
+ readPort.apiServiceDescriptor!.url
+ );
+ this.transformId = transformId;
+ this.getBundleId = context.getBundleId;
+ this.receiver = context.getReceiver(
+ onlyElement(Object.values(transform.outputs))
+ );
+ this.coder = context.pipelineContext.getCoder(readPort.coderId);
+ }
+
+ async startBundle() {
+ const this_ = this;
+ var endOfDataResolve, endOfDataReject;
+ this.endOfData = new Promise(async (resolve, reject) => {
+ endOfDataResolve = resolve;
+ endOfDataReject = reject;
+ });
+
+ await this_.multiplexingDataChannel.registerConsumer(
+ this_.getBundleId(),
+ this_.transformId,
+ {
+ sendData: async function (data: Uint8Array) {
+ console.log("Got", data);
+ const reader = new protobufjs.Reader(data);
+ while (reader.pos < reader.len) {
+ const maybePromise = this_.receiver.receive(
+ this_.coder.decode(reader, CoderContext.needsDelimiters)
+ );
+ if (maybePromise != NonPromise) {
+ await maybePromise;
+ }
+ }
+ },
+ sendTimers: async function (timerFamilyId: string, timers: Uint8Array)
{
+ throw Error("Not expecting timers.");
+ },
+ close: function () {
+ endOfDataResolve();
+ },
+ onError: function (error: Error) {
+ endOfDataReject(error);
+ },
+ }
+ );
+ }
+
+ process(wvalue: WindowedValue<unknown>): ProcessResult {
+ throw Error("Data should not come in via process.");
+ }
+
+ async finishBundle() {
+ try {
+ await this.endOfData;
+ } finally {
+ this.multiplexingDataChannel.unregisterConsumer(
+ this.getBundleId(),
+ this.transformId
+ );
+ }
+ }
+}
+
+registerOperator("beam:runner:source:v1", DataSourceOperator);
+
+class DataSinkOperator implements IOperator {
+ transformId: string;
+ getBundleId: () => string;
+ multiplexingDataChannel: MultiplexingDataChannel;
+ channel: IDataChannel;
+ coder: Coder<WindowedValue<unknown>>;
+ buffer: protobufjs.Writer;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ const writePort = RemoteGrpcPort.fromBinary(transform.spec!.payload);
+ this.multiplexingDataChannel = context.getDataChannel(
+ writePort.apiServiceDescriptor!.url
+ );
+ this.transformId = transformId;
+ this.getBundleId = context.getBundleId;
+ this.coder = context.pipelineContext.getCoder(writePort.coderId);
+ }
+
+ async startBundle() {
+ this.channel = this.multiplexingDataChannel.getSendChannel(
+ this.getBundleId(),
+ this.transformId
+ );
+ this.buffer = new protobufjs.Writer();
+ }
+
+ process(wvalue: WindowedValue<unknown>) {
+ this.coder.encode(wvalue, this.buffer, CoderContext.needsDelimiters);
+ if (this.buffer.len > 1e6) {
+ return this.flush();
+ }
+ return NonPromise;
+ }
+
+ async finishBundle() {
+ await this.flush();
+ this.channel.close();
+ }
+
+ async flush() {
+ if (this.buffer.len > 0) {
+ await this.channel.sendData(this.buffer.finish());
+ this.buffer = new protobufjs.Writer();
+ }
+ }
+}
+
+registerOperator("beam:runner:sink:v1", DataSinkOperator);
+
+class FlattenOperator implements IOperator {
+ receiver: Receiver;
+
+ constructor(
+ transformId: string,
+ transform: PTransform,
+ context: OperatorContext
+ ) {
+ this.receiver = context.getReceiver(
+ onlyElement(Object.values(transform.outputs))
+ );
+ }
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ return this.receiver.receive(wvalue);
+ }
+
+ async finishBundle() {}
+}
+
+registerOperator("beam:transform:flatten:v1", FlattenOperator);
+
+class GenericParDoOperator implements IOperator {
+ private doFn: DoFn<unknown, unknown, unknown>;
+ private getStateProvider: () => StateProvider;
+ private sideInputInfo: Map<string, SideInputInfo> = new Map();
+ private originalContext: object | undefined;
+ private augmentedContext: object | undefined;
+ private paramProvider: ParamProviderImpl;
+
+ constructor(
+ private transformId: string,
+ private receiver: Receiver,
+ private spec: runnerApi.ParDoPayload,
+ private payload: {
+ doFn: DoFn<unknown, unknown, unknown>;
+ context: any;
+ },
+ transformProto: runnerApi.PTransform,
+ operatorContext: OperatorContext
+ ) {
+ this.doFn = payload.doFn;
+ this.originalContext = payload.context;
+ this.getStateProvider = operatorContext.getStateProvider;
+ this.sideInputInfo = createSideInputInfo(
+ transformProto,
+ spec,
+ operatorContext
+ );
+ }
+
+ async startBundle() {
+ this.paramProvider = new ParamProviderImpl(
+ this.transformId,
+ this.sideInputInfo,
+ this.getStateProvider
+ );
+ this.augmentedContext = this.paramProvider.augmentContext(
+ this.originalContext
+ );
+ if (this.doFn.startBundle) {
+ this.doFn.startBundle(this.augmentedContext);
+ }
+ }
+
+ process(wvalue: WindowedValue<unknown>) {
+ if (this.augmentedContext && wvalue.windows.length != 1) {
+ // We need to process each window separately.
+ // TODO: (Perf) We could inspect the context more deeply and allow some
+ // cases to go through.
+ const result = new ProcessResultBuilder();
+ for (const window of wvalue.windows) {
+ result.add(
+ this.process({
+ value: wvalue.value,
+ windows: [window],
+ pane: wvalue.pane,
+ timestamp: wvalue.timestamp,
+ })
+ );
+ }
+ return result.build();
+ }
+
+ const this_ = this;
+ function reallyProcess(): ProcessResult {
+ const doFnOutput = this_.doFn.process(
+ wvalue.value,
+ this_.augmentedContext
+ );
+ if (!doFnOutput) {
+ return NonPromise;
+ }
+ const result = new ProcessResultBuilder();
+ for (const element of doFnOutput) {
+ result.add(
+ this_.receiver.receive({
+ value: element,
+ windows: wvalue.windows,
+ pane: wvalue.pane,
+ timestamp: wvalue.timestamp,
+ })
+ );
+ }
+ this_.paramProvider.update(undefined);
+ return result.build();
+ }
+
+ // Update the context with any information specific to this window.
+ const updateContextResult = this.paramProvider.update(wvalue);
+
+ // If we were able to do so without any deferred actions, process the
+ // element immediately.
+ if (updateContextResult == NonPromise) {
+ return reallyProcess();
+ } else {
+ // Otherwise return a promise that first waits for all the deferred
+ // actions to complete and then process the element.
+ return (async () => {
+ await updateContextResult;
+ const update2 = this.paramProvider.update(wvalue);
+ if (update2 != NonPromise) {
+ throw new Error("Expected all promises to be resolved: " + update2);
+ }
+ await reallyProcess();
+ })();
+ }
+ }
+
+ async finishBundle() {
+ if (this.doFn.finishBundle) {
+ const finishBundleOutput = this.doFn.finishBundle(this.augmentedContext);
+ if (!finishBundleOutput) {
+ return;
+ }
+ // The finishBundle method must return `void` or a
Generator<WindowedValue<OutputT>>. It may not
+ // return Generator<OutputT> without windowing information because a
single bundle may contain
+ // elements from different windows, so each element must specify its
window.
+ for (const element of finishBundleOutput) {
+ const maybePromise = this.receiver.receive(element);
+ if (maybePromise != NonPromise) {
+ await maybePromise;
+ }
+ }
+ }
+ }
+}
+
+class IdentityParDoOperator implements IOperator {
+ constructor(private receiver: Receiver) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ return this.receiver.receive(wvalue);
+ }
+
+ async finishBundle() {}
+}
+
+class SplittingDoFnOperator implements IOperator {
+ constructor(
+ private splitter: (any) => string,
+ private receivers: { [key: string]: Receiver }
+ ) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const tag = this.splitter(wvalue.value);
+ const receiver = this.receivers[tag];
+ if (receiver) {
+ return receiver.receive(wvalue);
+ } else {
+ // TODO: (API) Make this configurable.
+ throw new Error(
+ "Unexpected tag '" +
+ tag +
+ "' for " +
+ wvalue.value +
+ " not in " +
+ [...Object.keys(this.receivers)]
+ );
+ }
+ }
+
+ async finishBundle() {}
+}
+
+class Splitting2DoFnOperator implements IOperator {
+ constructor(private receivers: { [key: string]: Receiver }) {}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const result = new ProcessResultBuilder();
+ // TODO: (API) Should I exactly one instead of allowing a union?
+ for (const tag of Object.keys(wvalue.value as object)) {
+ const receiver = this.receivers[tag];
+ if (receiver) {
+ result.add(
+ receiver.receive({
+ value: (wvalue.value as object)[tag],
+ windows: wvalue.windows,
+ timestamp: wvalue.timestamp,
+ pane: wvalue.pane,
+ })
+ );
+ } else {
+ // TODO: (API) Make this configurable.
+ throw new Error(
+ "Unexpected tag '" +
+ tag +
+ "' for " +
+ wvalue.value +
+ " not in " +
+ [...Object.keys(this.receivers)]
+ );
+ }
+ }
+ return result.build();
+ }
+
+ async finishBundle() {}
+}
+
+class AssignWindowsParDoOperator implements IOperator {
+ constructor(private receiver: Receiver, private windowFn: WindowFn<Window>)
{}
+
+ async startBundle() {}
+
+ process(wvalue: WindowedValue<unknown>) {
+ const newWindowsOnce = this.windowFn.assignWindows(wvalue.timestamp);
+ if (newWindowsOnce.length > 0) {
+ const newWindows: Window[] = [];
+ for (var i = 0; i < wvalue.windows.length; i++) {
+ newWindows.push(...newWindowsOnce);
Review Comment:
Could someone help me understand what this block is doing? It seems kinda
odd to me - is the goal to end up with newWindowsOnce repeated
`wvalue.windows.length` times, and if so - why?
##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as protobufjs from "protobufjs";
+
+import { PTransform, PCollection } from "../proto/beam_runner_api";
+import * as runnerApi from "../proto/beam_runner_api";
+import * as fnApi from "../proto/beam_fn_api";
+import { ProcessBundleDescriptor, RemoteGrpcPort } from "../proto/beam_fn_api";
+import { MultiplexingDataChannel, IDataChannel } from "./data";
+import { StateProvider } from "./state";
+
+import * as urns from "../internal/urns";
+import { PipelineContext } from "../internal/pipeline";
+import { deserializeFn } from "../internal/serialize";
+import { Coder, Context as CoderContext } from "../coders/coders";
+import { Window, Instant, PaneInfo, WindowedValue } from "../values";
+import { ParDo, DoFn, ParDoParam } from "../transforms/pardo";
+import { WindowFn } from "../transforms/window";
+
+import {
+ ParamProviderImpl,
+ SideInputInfo,
+ createSideInputInfo,
+} from "./pardo_context";
+
+// Trying to get some of https://github.com/microsoft/TypeScript/issues/8240
+export const NonPromise = null;
+
+export type ProcessResult = null | Promise<void>;
+
+export class ProcessResultBuilder {
+ promises: Promise<void>[] = [];
+ add(result: ProcessResult) {
+ if (result != NonPromise) {
+ this.promises.push(result as Promise<void>);
+ }
+ }
+ build(): ProcessResult {
+ if (this.promises.length == 0) {
+ return NonPromise;
+ } else if (this.promises.length == 1) {
+ return this.promises[0];
+ } else {
+ return Promise.all(this.promises).then(() => void null);
+ }
+ }
+}
+
+export interface IOperator {
+ startBundle: () => Promise<void>;
+ // As this is called at every operator at every element, and the vast
majority
+ // of the time Promises are not needed, we wish to avoid the overhead of
+ // creating promisses and await as much as possible.
+ process: (wv: WindowedValue<unknown>) => ProcessResult;
+ finishBundle: () => Promise<void>;
+}
+
+export class Receiver {
+ constructor(private operators: IOperator[]) {}
+
+ receive(wvalue: WindowedValue<unknown>): ProcessResult {
+ if (this.operators.length == 1) {
+ return this.operators[0].process(wvalue);
+ } else {
+ const result = new ProcessResultBuilder();
+ for (const operator of this.operators) {
+ result.add(operator.process(wvalue));
+ }
+ return result.build();
+ }
+ }
+}
+
+export class OperatorContext {
+ pipelineContext: PipelineContext;
+ constructor(
+ public descriptor: ProcessBundleDescriptor,
+ public getReceiver: (string) => Receiver,
+ public getDataChannel: (string) => MultiplexingDataChannel,
+ public getStateProvider: () => StateProvider,
+ public getBundleId: () => string
+ ) {
+ this.pipelineContext = new PipelineContext(descriptor);
+ }
+}
+
+export function createOperator(
+ transformId: string,
+ context: OperatorContext
+): IOperator {
+ const transform = context.descriptor.transforms[transformId];
+ // Ensure receivers are eagerly created.
+ Object.values(transform.outputs).map(context.getReceiver);
+ let operatorConstructor = operatorsByUrn.get(transform.spec!.urn!);
+ if (operatorConstructor == undefined) {
+ throw new Error("Unknown transform type:" + transform.spec?.urn);
+ }
+ return operatorConstructor(transformId, transform, context);
+}
+
+type OperatorConstructor = (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+) => IOperator;
+interface OperatorClass {
+ new (
+ transformId: string,
+ transformProto: PTransform,
+ context: OperatorContext
+ ): IOperator;
+}
+
+const operatorsByUrn: Map<string, OperatorConstructor> = new Map();
+
+export function registerOperator(urn: string, cls: OperatorClass) {
Review Comment:
```suggestion
export function registerOperator(urn: string, cls: OperatorClass): IOperator
{
```
##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import * as fnApi from "../proto/beam_fn_api";
+import { BeamFnStateClient } from "../proto/beam_fn_api.grpc-client";
+
+// TODO: (Extension) Lazy iteration via continuation tokens.
+// This will likely require promises all the way up to the consumer.
+
+interface PromiseWrapper<T> {
+ type: "promise";
+ promise: Promise<T>;
+}
+
+interface ValueWrapper<T> {
+ type: "value";
+ value: T;
+}
+
+// We want to avoid promises when not needed (e.g. for a cache hit) as they
+// have to bubble all the way up the stack.
+export type MaybePromise<T> = PromiseWrapper<T> | ValueWrapper<T>;
+
+export interface StateProvider {
+ getState: <T>(
+ stateKey: fnApi.StateKey,
+ decode: (data: Uint8Array) => T
+ ) => MaybePromise<T>;
+}
+
+// TODO: (Advanced) Cross-bundle caching.
+export class CachingStateProvider implements StateProvider {
+ underlying: StateProvider;
+ cache: Map<string, MaybePromise<any>> = new Map();
+
+ constructor(underlying: StateProvider) {
+ this.underlying = underlying;
+ }
+
+ getState<T>(stateKey: fnApi.StateKey, decode: (data: Uint8Array) => T) {
+ // TODO: (Perf) Consider caching on something ligher-weight than the full
+ // serialized key, only constructing this proto when interacting with
+ // the runner.
+ const cacheKey = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+ "base64"
+ );
+ if (this.cache.has(cacheKey)) {
+ return this.cache.get(cacheKey)!;
+ } else {
Review Comment:
Nit: It is probably cleaner to drop this else and unindent this block since
we're early returning in the if
##########
sdks/typescript/src/apache_beam/worker/data.ts:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as grpc from "@grpc/grpc-js";
+
+import { Elements } from "../proto/beam_fn_api";
+import {
+ ProcessBundleDescriptor,
+ ProcessBundleResponse,
+} from "../proto/beam_fn_api";
+import {
+ BeamFnDataClient,
+ IBeamFnDataClient,
+} from "../proto/beam_fn_api.grpc-client";
+
+export class MultiplexingDataChannel {
+ dataClient: BeamFnDataClient;
+ dataChannel: grpc.ClientDuplexStream<Elements, Elements>;
+
+ consumers: Map<string, Map<string, IDataChannel>> = new Map();
+
+ constructor(endpoint: string, workerId: string) {
+ const metadata = new grpc.Metadata();
+ metadata.add("worker_id", workerId);
+ this.dataClient = new BeamFnDataClient(
+ endpoint,
+ grpc.ChannelCredentials.createInsecure(),
+ {},
+ {}
+ );
+ this.dataChannel = this.dataClient.data(metadata);
+ this.dataChannel.on("data", async (elements) => {
+ console.log("data", elements);
+ for (const data of elements.data) {
+ const consumer = this.getConsumer(data.instructionId,
data.transformId);
+ try {
+ await consumer.sendData(data.data);
+ if (data.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ for (const timers of elements.timers) {
+ const consumer = this.getConsumer(
+ timers.instructionId,
+ timers.transformId
+ );
+ try {
+ await consumer.sendTimers(timers.timerFamilyId, timers.timers);
+ if (timers.isLast) {
+ consumer.close();
+ }
+ } catch (error) {
+ consumer.onError(error);
+ }
+ }
+ });
+ }
+
+ close() {
+ this.dataChannel.end();
+ }
+
+ async registerConsumer(
+ bundleId: string,
+ transformId: string,
+ consumer: IDataChannel
+ ) {
+ consumer = truncateOnErrorDataChannel(consumer);
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (this.consumers.get(bundleId)!.has(transformId)) {
+ await (
+ this.consumers.get(bundleId)!.get(transformId) as BufferingDataChannel
+ ).flush(consumer);
+ }
+ this.consumers.get(bundleId)!.set(transformId, consumer);
+ }
+
+ unregisterConsumer(bundleId: string, transformId: string) {
+ this.consumers.get(bundleId)!.delete(transformId);
+ }
+
+ getConsumer(bundleId: string, transformId: string): IDataChannel {
+ if (!this.consumers.has(bundleId)) {
+ this.consumers.set(bundleId, new Map());
+ }
+ if (!this.consumers.get(bundleId)!.has(transformId)) {
Review Comment:
Nit:
```suggestion
} else if (!this.consumers.get(bundleId)!.has(transformId)) {
```
Issue Time Tracking
-------------------
Worklog Id: (was: 762323)
Time Spent: 3h 20m (was: 3h 10m)
> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
> Key: BEAM-1754
> URL: https://issues.apache.org/jira/browse/BEAM-1754
> Project: Beam
> Issue Type: New Feature
> Components: sdk-ideas
> Reporter: Diego Zuluaga
> Assignee: Kerry Donny-Clark
> Priority: P3
> Labels: node.js
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples
> highly comprehensible. However, I have to admit that for most Node.js
> developers who have little background on typed languages and are used to get
> up to speed with frameworks incredibly fast, learning Dataflow might take
> some learning curve that they/we're not used to. So, I wonder if at any point
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the
> question, but I wanted to run it by the team as it would be awesome to have
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python
--
This message was sent by Atlassian Jira
(v8.20.7#820007)