lostluck commented on code in PR #1: URL: https://github.com/apache/beam-swift/pull/1#discussion_r1348085198
########## Sources/ApacheBeam/Coders/Beamable.swift: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// Codable is already taken and besides Beamable is too good to pass up Review Comment: Approved. ########## README.md: ########## @@ -0,0 +1,32 @@ +# Apache Beam Swift SDK + +Provides [Apache Beam][0] for Swift. Review Comment: I was going to ding this on https://www.apache.org/foundation/marks/pmcs#other grounds, but I think the "for" makes it clear, and satisfies the Apache branding rules. Optionally, we could add "SDK" for parity with the other language SDKs. ```suggestion Provides the [Apache Beam][0] SDK for Swift. ``` ########## Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Logging + +import Foundation + +struct BundleProcessor { + let log: Logging.Logger + + struct Step { + let transformId: String + let fn: SerializableFn + let inputs: [AnyPCollectionStream] + let outputs: [AnyPCollectionStream] + let payload: Data + } + + let steps: [Step] + + init(id: String, + descriptor: Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor, + collections: [String: AnyPCollection], + fns: [String: SerializableFn]) throws + { + log = Logging.Logger(label: "BundleProcessor(\(id) \(descriptor.id))") + + var temp: [Step] = [] + var coders = BundleCoderContainer(bundle: descriptor) + + var streams: [String: AnyPCollectionStream] = [:] + // First make streams for everything in this bundle (maybe I could use the pcollection array for this?) + for (_, transform) in descriptor.transforms { + for id in transform.inputs.values { + if streams[id] == nil { + streams[id] = collections[id]!.anyStream + } + } + for id in transform.outputs.values { + if streams[id] == nil { + streams[id] = collections[id]!.anyStream + } + } + } + + for (transformId, transform) in descriptor.transforms { + let urn = transform.spec.urn + // Map the input and output streams in the correct order + let inputs = transform.inputs.sorted().map { streams[$0.1]! } + let outputs = transform.outputs.sorted().map { streams[$0.1]! } + + if urn == "beam:runner:source:v1" { + let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) + let coder = try Coder.of(name: remotePort.coderID, in: coders) + log.info("Source '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)") + try temp.append(Step( + transformId: transform.uniqueName == "" ? transformId : transform.uniqueName, + fn: Source(client: .client(for: ApiServiceDescriptor(proto: remotePort.apiServiceDescriptor), worker: id), coder: coder), + inputs: inputs, + outputs: outputs, + payload: Data() + )) + } else if urn == "beam:runner:sink:v1" { + let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) + let coder = try Coder.of(name: remotePort.coderID, in: coders) + log.info("Sink '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)") + try temp.append(Step( + transformId: transform.uniqueName == "" ? transformId : transform.uniqueName, + fn: Sink(client: .client(for: ApiServiceDescriptor(proto: remotePort.apiServiceDescriptor), worker: id), coder: coder), + inputs: inputs, + outputs: outputs, + payload: Data() + )) + + } else if urn == "beam:transform:pardo:v1" { + let pardoPayload = try Org_Apache_Beam_Model_Pipeline_V1_ParDoPayload(serializedData: transform.spec.payload) + if let fn = fns[transform.uniqueName] { + temp.append(Step(transformId: transform.uniqueName, + fn: fn, + inputs: inputs, + outputs: outputs, + payload: pardoPayload.doFn.payload)) + } else { + log.warning("Unable to map \(transform.uniqueName) to a known SerializableFn. Will be skipped during processing.") + } + } else { + log.warning("Unable to map \(urn). Will be skipped during processing.") + } + } + steps = temp + } + + public func process(instruction: String, responder: AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation) async { + _ = await withThrowingTaskGroup(of: (String, String).self) { group in + log.info("Starting bundle processing for \(instruction)") + var count: Int = 0 + do { + for step in steps { Review Comment: So this basically connects together the input and output streams to a given DoFn directly, and has async handling for actually passing around input and output? Neat! I wonder how much scheduling overhead exists for Swift's sync/await approach... (unbuffered Go channels have ~300ns overhead for scheduling in Go 1.21) I'm curious about error propagation here too, if a DoFn fails. Is the context/provenance clear from the stack trace though here? ########## Sources/ApacheBeam/Runtime/Bundle/Source.swift: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import Logging + +/// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given +/// data element might contain more than one coder +final class Source: SerializableFn { + let client: DataplaneClient + let coder: Coder + let log: Logger + + public init(client: DataplaneClient, coder: Coder) { + self.client = client + self.coder = coder + log = Logger(label: "Source") + } + + func process(context: SerializableFnBundleContext, + inputs _: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) + { + log.info("Waiting for input on \(context.instruction)-\(context.transform)") + let (stream, _) = await client.makeStream(instruction: context.instruction, transform: context.transform) + + var messages = 0 + var count = 0 + for await message in stream { + messages += 1 + switch message { + case let .data(data): + var d = data + while d.count > 0 { + let value = try coder.decode(&d) + for output in outputs { + try output.emit(value: value) + count += 1 + } + } + case let .last(id, transform): + for output in outputs { + output.finish() + } + await client.finalizeStream(instruction: id, transform: transform) + log.info("Source \(context.instruction),\(context.transform) handled \(count) items over \(messages) messages") + return (id, transform) + // TODO: Handle timer messages + default: + log.info("Unhanled message \(message)") Review Comment: ```suggestion log.info("Unhandled message \(message)") ``` ########## Sources/ApacheBeam/Runtime/DataplaneClient.swift: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import GRPC +import Logging + +/// Client for handling the multiplexing and demultiplexing of Dataplane messages +actor DataplaneClient { + public struct Pair: Hashable { + let id: String + let transform: String + } + + public enum Message { + case data(Data) + case timer(String, Data) + case last(String, String) + case flush + } + + struct Multiplex { + let id: String + let transform: String + let message: Message + } + + typealias InternalStream = AsyncStream<Multiplex> + typealias Stream = AsyncStream<Message> + + public struct MultiplexContinuation { + let id: String + let transform: String + let base: InternalStream.Continuation + + @discardableResult + func yield(_ value: Message) -> InternalStream.Continuation.YieldResult { + base.yield(Multiplex(id: id, transform: transform, message: value)) + } + + func finish() { + // Does nothing + } + } + + private let id: String + private let log: Logging.Logger + private let multiplex: (InternalStream, InternalStream.Continuation) + private var streams: [Pair: (Stream, Stream.Continuation, MultiplexContinuation)] = [:] + private let flush: Int + + public init(id: String, endpoint: ApiServiceDescriptor, flush: Int = 1000) throws { + self.id = id + log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))") + multiplex = AsyncStream.makeStream(of: Multiplex.self) + self.flush = flush + let client = try Org_Apache_Beam_Model_FnExecution_V1_BeamFnDataAsyncClient(channel: GRPCChannelPool.with(endpoint: endpoint, eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1)), defaultCallOptions: CallOptions(customMetadata: ["worker_id": id])) + let stream = client.makeDataCall() + + // Mux task + Task { + log.info("Initiating data plane multiplexing.") + + let input = multiplex.0 + var count = 0 + var flushes = 0 + + var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() + for try await element in input { + var shouldFlush = false + switch element.message { + case let .data(payload): + elements.data.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.data = payload + }) + count += 1 + case let .timer(family, payload): + elements.timers.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.timerFamilyID = family + $0.timers = payload + }) + count += 1 + case let .last(id, transform): Review Comment: Two concerns here: 1. Same as before, uf all true cases are executed this should be OK. 2. The Timer sequence and the Data sequence have independent "is_last" signals, https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L735+L761 so it's not clear to me that this will do the right thing. This is because they will be routed to different transforms (Data is always routed to a "datasource" transform, and timers to user transform hosted callbacks). ########## Sources/ApacheBeam/Runtime/DataplaneClient.swift: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import GRPC +import Logging + +/// Client for handling the multiplexing and demultiplexing of Dataplane messages +actor DataplaneClient { + public struct Pair: Hashable { + let id: String + let transform: String + } + + public enum Message { + case data(Data) + case timer(String, Data) + case last(String, String) + case flush + } + + struct Multiplex { + let id: String + let transform: String + let message: Message + } + + typealias InternalStream = AsyncStream<Multiplex> + typealias Stream = AsyncStream<Message> + + public struct MultiplexContinuation { + let id: String + let transform: String + let base: InternalStream.Continuation + + @discardableResult + func yield(_ value: Message) -> InternalStream.Continuation.YieldResult { + base.yield(Multiplex(id: id, transform: transform, message: value)) + } + + func finish() { + // Does nothing + } + } + + private let id: String + private let log: Logging.Logger + private let multiplex: (InternalStream, InternalStream.Continuation) + private var streams: [Pair: (Stream, Stream.Continuation, MultiplexContinuation)] = [:] + private let flush: Int + + public init(id: String, endpoint: ApiServiceDescriptor, flush: Int = 1000) throws { + self.id = id + log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))") + multiplex = AsyncStream.makeStream(of: Multiplex.self) + self.flush = flush + let client = try Org_Apache_Beam_Model_FnExecution_V1_BeamFnDataAsyncClient(channel: GRPCChannelPool.with(endpoint: endpoint, eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1)), defaultCallOptions: CallOptions(customMetadata: ["worker_id": id])) + let stream = client.makeDataCall() + + // Mux task + Task { + log.info("Initiating data plane multiplexing.") + + let input = multiplex.0 + var count = 0 + var flushes = 0 + + var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() + for try await element in input { + var shouldFlush = false + switch element.message { + case let .data(payload): + elements.data.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.data = payload + }) + count += 1 + case let .timer(family, payload): + elements.timers.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.timerFamilyID = family + $0.timers = payload + }) + count += 1 + case let .last(id, transform): Review Comment: Oh! This is the "writing" side. Probably fine then. ########## Sources/ApacheBeam/Runtime/Bundle/Source.swift: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import Logging + +/// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given +/// data element might contain more than one coder +final class Source: SerializableFn { + let client: DataplaneClient + let coder: Coder + let log: Logger + + public init(client: DataplaneClient, coder: Coder) { + self.client = client + self.coder = coder + log = Logger(label: "Source") + } + + func process(context: SerializableFnBundleContext, + inputs _: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) + { + log.info("Waiting for input on \(context.instruction)-\(context.transform)") + let (stream, _) = await client.makeStream(instruction: context.instruction, transform: context.transform) + + var messages = 0 + var count = 0 + for await message in stream { + messages += 1 + switch message { + case let .data(data): + var d = data + while d.count > 0 { Review Comment: Just confirming that this handling for the data buffers appears correct. Multiple element are parsed through. Technically the main "missing" bit is propagating out the count as a progress signal, but that's definitely a performance optimization once you start supporting channel splits. ########## Sources/ApacheBeam/Coders/Coder+Decoding.swift: ########## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +/// This extension contains all of the decoding implementation. File separation is for clarity. +public extension Coder { + /// Decodes a raw data block into a BeamValue for further processing + func decode(_ data: inout Data) throws -> BeamValue { + switch self { + // Scalar values check for size 0 input data and return null if that's a problem + + case .bytes: + return try .bytes(data.count == 0 ? Data() : data.subdata()) + case .string: + return try .string(data.count == 0 ? "" : String(data: data.subdata(), encoding: .utf8)) + case .varint: + return try .integer(data.count == 0 ? nil : data.varint()) + case .fixedint: + return try .integer(data.count == 0 ? nil : data.next(Int.self)) + case .byte: + return try .integer(data.count == 0 ? nil : Int(data.next(UInt8.self))) + case .boolean: + return try .boolean(data.count == 0 ? nil : data.next(UInt8.self) != 0) + case .double: + return try .double(data.count == 0 ? nil : data.next(Double.self)) + case .globalwindow: + return .window(.global) + case let .lengthprefix(coder): // Length prefix basically serves to make values nullable + var subdata = try data.subdata() + return try coder.decode(&subdata) + case let .keyvalue(keyCoder, valueCoder): + return try .kv(keyCoder.decode(&data), valueCoder.decode(&data)) + case let .iterable(coder): + let length = try data.next(Int32.self) + return try .array((0 ..< length).map { _ in try coder.decode(&data) }) + case let .windowedvalue(valueCoder, windowCoder): + // This will be big endian to match java + let timestamp = try data.instant() + + let windowCount = try data.next(Int32.self) + if windowCount > 1 { + throw ApacheBeamError.runtimeError("Windowed values with > 1 window not yet supported") Review Comment: Technically, the only thing to do for multiple windows is Explode their elements into individual processing if observed (eg. explicitly, or via side inputs) ########## Sources/ApacheBeam/Runtime/Bundle/Source.swift: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import Logging + +/// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given +/// data element might contain more than one coder +final class Source: SerializableFn { + let client: DataplaneClient + let coder: Coder + let log: Logger + + public init(client: DataplaneClient, coder: Coder) { + self.client = client + self.coder = coder + log = Logger(label: "Source") + } + + func process(context: SerializableFnBundleContext, + inputs _: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) + { + log.info("Waiting for input on \(context.instruction)-\(context.transform)") + let (stream, _) = await client.makeStream(instruction: context.instruction, transform: context.transform) + + var messages = 0 + var count = 0 + for await message in stream { + messages += 1 + switch message { + case let .data(data): + var d = data + while d.count > 0 { + let value = try coder.decode(&d) + for output in outputs { + try output.emit(value: value) + count += 1 + } + } + case let .last(id, transform): Review Comment: Are these cases mutually exclusive, or will all true cases run? It's legal for runners to send the final data at the same time as "is_last == true", so we wouldn't want to drop data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
