This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit d22e84f5bd2b5be07a49c5d3c02b1859245c55e7 Author: Byron Ellis <[email protected]> AuthorDate: Thu Aug 10 10:30:38 2023 -0700 Adding more of the Swift SDK implementation --- .../Sources/ApacheBeam/Core/Environment.swift | 44 ++++++++++- sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift | 63 ++++++++++++++++ .../Sources/ApacheBeam/Core/Fn/SerialiableFn.swift | 38 ++++++++++ .../Core/PCollection/AnyPCollectionStream.swift | 86 ++++++++++++++++++++++ .../Core/PCollection/PCollectionStream.swift | 56 ++++++++++++++ .../ApacheBeam/Internal/ProtoConversion.swift | 2 +- .../ApacheBeam/Internal/Server+Endpoint.swift | 24 ++++++ .../Sources/ApacheBeam/Runtime/Worker/Worker.swift | 5 ++ .../ApacheBeam/Runtime/Worker/WorkerProvider.swift | 60 +++++++++++++++ 9 files changed, 375 insertions(+), 3 deletions(-) diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift index 9e45c4575c4..f3b0fb85cbc 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift @@ -21,15 +21,24 @@ public struct Environment { public enum Category { + /// Default environment type. "default" is a reserved word so we use "system" here case system - case process(String,String,String) + /// Process. command, arch, os, environment + case process(String,String,String,[String:String]) + /// Docker container image case docker(String) + /// External service using an api descriptor case external(ApiServiceDescriptor) } let category: Category + let capabilities: [String] + let dependencies: [ArtifactInfo] + public init(_ category: Category = .system,capabilities:[String] = [],dependencies:[ArtifactInfo]) { self.category = category + self.capabilities = capabilities + self.dependencies = dependencies } } @@ -38,7 +47,38 @@ extension Environment : ProtoConversion { typealias Proto = EnvironmentProto var proto: EnvironmentProto { - .with { p in + get throws { + return try .with { p in + p.urn = switch category { + case .docker(_): .beamUrn("docker",type:"env") + case .system: .beamUrn("default",type:"env") + case .process(_, _, _, _): .beamUrn("process",type:"env") + case .external(_): .beamUrn("external",type:"env") + } + p.capabilities = capabilities + p.dependencies = dependencies.map({ $0.proto }) + + if case let .docker(containerImage) = category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with { + $0.containerImage = containerImage + }.serializedData() + } + + if case let .external(endpoint) = category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with { + $0.endpoint = endpoint.proto + }.serializedData() + } + + if case let .process(command,arch,os,env) = category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with { + $0.arch = arch + $0.command = command + $0.os = os + $0.env = env + }.serializedData() + } + } } } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift new file mode 100644 index 00000000000..82fd4708046 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift @@ -0,0 +1,63 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import Foundation + +public final class DoneFn<Of> : SerializableFn { + + private let fn: (PCollectionStream<Of>) async throws -> Void + public init(_ fn: @Sendable @escaping (PCollectionStream<Of>) async throws -> Void) { + self.fn = fn + } + + + public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + try await fn(inputs[0].stream()) + for output in outputs { + output.finish() + } + return (context.instruction,context.transform) + } +} + +public final class ParameterizedDoneFn<Of,Param:Codable> : SerializableFn { + + + private let param: Param + private let fn: (Param,PCollectionStream<Of>) async throws -> Void + + public init(_ param: Param,_ fn: @Sendable @escaping (Param,PCollectionStream<Of>) async throws -> Void){ + self.param = param + self.fn = fn + } + + public var payload: Data { + get throws { + try JSONEncoder().encode(param) + } + } + + public func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + try await fn(try JSONDecoder().decode(Param.self, from: context.payload),inputs[0].stream()) + for output in outputs { + output.finish() + } + return (context.instruction,context.transform) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift new file mode 100644 index 00000000000..3a30066dc4b --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift @@ -0,0 +1,38 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +import Foundation +import Logging + +public struct SerializableFnBundleContext { + let instruction:String + let transform:String + let payload:Data + let log:Logger +} + +/// SerialiableFn is a protocol for functions that should be parameterized for the pipeline +public protocol SerializableFn { + var payload: Data { get throws } + func process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream]) async throws -> (String,String) +} + +/// Provide some defaults where our function doesn't have any payload +public extension SerializableFn { + var payload : Data { Data() } +} + diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift new file mode 100644 index 00000000000..da15c740771 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift @@ -0,0 +1,86 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import Foundation + +public struct AnyPCollectionStream : AsyncSequence { + public typealias Element = Iterator.Element + public typealias AsyncIterator = Iterator + + public struct Iterator : AsyncIteratorProtocol { + public typealias Element = (Any,Date,Window) + + let nextClosure: () async throws -> Element? + public mutating func next() async throws -> Element? { + return try await nextClosure() + } + } + + let value: Any + let nextGenerator: (Any) -> (() async throws -> Iterator.Element?) + let emitClosure: (Any,Any) -> Void + let finishClosure: (Any) -> Void + + public func makeAsyncIterator() -> Iterator { + return Iterator(nextClosure: nextGenerator(value)) + } + + public init(_ value: AnyPCollectionStream) { + self = value + } + + public init<Of>(_ value: PCollectionStream<Of>) { + self.value = value + + self.emitClosure = { + let stream = ($0 as! PCollectionStream<Of>) + if let beamValue = $1 as? BeamValue { + stream.emit(beamValue) + } else if let element = $1 as? Element { + stream.emit((element.0 as! Of,element.1,element.2)) + } else if let element = $1 as? PCollectionStream<Of>.Element { + stream.emit(element) + } + } + + self.finishClosure = { + ($0 as! PCollectionStream<Of>).finish() + } + + self.nextGenerator = { + var iterator = ($0 as! PCollectionStream<Of>).makeAsyncIterator() + return { + if let element = await iterator.next() { + return (element.0 as Any,element.1,element.2) + } else { + return nil + } + } + } + } + + public func stream<Out>() -> PCollectionStream<Out> { + return value as! PCollectionStream<Out> + } + + public func finish() { + finishClosure(self.value) + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift new file mode 100644 index 00000000000..d54e2638230 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift @@ -0,0 +1,56 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import Foundation + +/// The worker side realization of a PCollection that supports reading and writing +public final class PCollectionStream<Of> : AsyncSequence { + public typealias Element = (Of,Date,Window) + + private let stream: AsyncStream<Element> + private let emitter: AsyncStream<Element>.Continuation + + public init() { + //Construct a stream, capturing the emit continuation + var tmp: AsyncStream<Element>.Continuation? + self.stream = AsyncStream<Element> { tmp = $0 } + self.emitter = tmp! + } + + public func makeAsyncIterator() -> AsyncStream<Element>.Iterator { + return stream.makeAsyncIterator() + } + + public func finish() { + emitter.finish() + } + + + public func emit(_ value: Element) { + emitter.yield(value) + } + + public func emit(_ value: Of,timestamp: Date = .now,window:Window = .global) { + emit((value,timestamp,window)) + } + + public func emit(_ value: BeamValue) { + + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift index 2a50e950386..2ab1cf85c66 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift @@ -19,7 +19,7 @@ protocol ProtoConversion { associatedtype Proto - var proto: Proto { get } + var proto: Proto { get throws } } diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift b/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift new file mode 100644 index 00000000000..f32da24fbe1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift @@ -0,0 +1,24 @@ +// +// File.swift +// +// +// Created by Byron Ellis on 8/9/23. +// + +import GRPC + +extension Server { + var endpoint: ApiServiceDescriptor { + get throws { + let address = self.channel.localAddress! + if let pathname = address.pathname { + return ApiServiceDescriptor(unixAddress: pathname) + } + if let host = address.ipAddress,let port = address.port { + return ApiServiceDescriptor(host: host, port: port) + } + throw ApacheBeamError.runtimeError("Can't determine endpoint for \(address)") + } + } +} + diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift new file mode 100644 index 00000000000..4bbbd44b889 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift @@ -0,0 +1,5 @@ +actor Worker { + public init(id:String) { + + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift new file mode 100644 index 00000000000..fb5e2e52225 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPC +import Logging + +actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorkerPoolAsyncProvider { + + private let log = Logger(label:"WorkerProvider") + private var workers: [String:Worker] = [:] + + private let functions: [String:SerializableFn] + + init(_ functions: [String:SerializableFn]) { + self.functions = functions + } + + + + + func startWorker(request: Org_Apache_Beam_Model_FnExecution_V1_StartWorkerRequest, context: GRPC.GRPCAsyncServerCallContext) async throws -> Org_Apache_Beam_Model_FnExecution_V1_StartWorkerResponse { + log.info("Got request to start worker \(request.workerID)") + do { + if let worker = workers[request.workerID] { + log.info("Worker \(request.workerID) is already running.") + return .with { _ in } + } else { + workers[request.workerID] = try Worker(id: request.workerID) + } + return .with { _ in + + } + } catch { + log.error("Unable to start worker \(request.workerID): \(error)") + return .with { + $0.error = "\(error)" + } + } + } + + func stopWorker(request: Org_Apache_Beam_Model_FnExecution_V1_StopWorkerRequest, context: GRPC.GRPCAsyncServerCallContext) async throws -> Org_Apache_Beam_Model_FnExecution_V1_StopWorkerResponse { + return .with { _ in } + } + + +}
