This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit f3f519d11d93923bf5b09aeb55ce1e1fca964d34 Author: Byron Ellis <[email protected]> AuthorDate: Wed Aug 16 18:22:27 2023 -0700 Add the rest of the code. Basic word count integration test now completes successfully with the Python Portable Runner. --- .../ApacheBeam/Client/ApiServiceDescriptor.swift | 38 +++- .../Sources/ApacheBeam/Coders/Coder+Decoding.swift | 2 +- sdks/swift/Sources/ApacheBeam/Coders/Coder.swift | 16 +- .../Sources/ApacheBeam/Core/ArtifactInfo.swift | 8 +- .../Sources/ApacheBeam/Core/Environment.swift | 50 ++++++ .../ApacheBeam/Core/Fn/SerializableFn.swift | 2 +- sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift | 2 +- .../Core/PCollection/AnyPCollectionStream.swift | 4 + .../Core/PCollection/PCollectionStream.swift | 22 +-- .../ApacheBeam/Core/Pipeline/Pipeline.swift | 18 +- .../ApacheBeam/Core/Pipeline/PipelineContext.swift | 6 +- .../ApacheBeam/Core/Pipeline/PipelineRunner.swift | 3 + .../ApacheBeam/Internal/Dictionary+Helpers.swift | 6 + .../Internal/PipelineProto+Environment.swift | 32 +--- .../ApacheBeam/Internal/ProtoConversion.swift | 2 +- .../ApacheBeam/Runners/PortableRunner.swift | 59 +++++++ .../Runtime/Bundle/BundleProcessor.swift | 113 ++++++++++++ .../Sources/ApacheBeam/Runtime/Bundle/Sink.swift | 26 +++ .../Sources/ApacheBeam/Runtime/Bundle/Source.swift | 43 +++++ .../ApacheBeam/Runtime/DataplaneClient.swift | 192 +++++++++++++++++++++ .../Sources/ApacheBeam/Runtime/Worker/Worker.swift | 72 +++++++- .../ApacheBeam/Runtime/Worker/WorkerProvider.swift | 37 +++- .../Sources/ApacheBeam/Transforms/Basic.swift | 3 + .../ApacheBeamTests/Pipeline/Fixtures/file1.txt | 4 + .../ApacheBeamTests/Pipeline/Fixtures/file2.txt | 3 + .../Pipeline/IntegrationTests.swift | 11 +- 26 files changed, 684 insertions(+), 90 deletions(-) diff --git a/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift index 40e1b662d8f..18d95403ebc 100644 --- a/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift +++ b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift @@ -16,6 +16,10 @@ * limitations under the License. */ +import GRPC +import NIOCore + + /// Representation of the API Service Descriptors used to communicate with runners (and vice versa) public struct ApiServiceDescriptor { @@ -41,16 +45,15 @@ extension ApiServiceDescriptor { extension ApiServiceDescriptor : ProtoConversion { - typealias Proto = Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor - var proto: Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor { - .with { $0.url = self.url } + func populate(_ proto: inout Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor) throws { + proto.url = self.url } +} +extension ApiServiceDescriptor : Hashable { } - - public extension ApiServiceDescriptor { static func from(env:String,format:EncodedAs = .textproto) throws -> ApiServiceDescriptor { switch format { @@ -61,3 +64,28 @@ public extension ApiServiceDescriptor { } } } + +public extension GRPCChannelPool { + static func with(endpoint:ApiServiceDescriptor, eventLoopGroup: EventLoopGroup) throws -> GRPCChannel { + let url = endpoint.url + //TODO: Transport Security configuration + if(url.starts(with: "unix://")) { + return try GRPCChannelPool.with(target: .unixDomainSocket(url.replacing("unix://",with:"")), + transportSecurity: .plaintext, + eventLoopGroup: eventLoopGroup) + } else { + if let lastNdx = url.lastIndex(of: ":") { + guard lastNdx.utf16Offset(in: url) > 0 else { + throw ApacheBeamError.runtimeError("Service URL must be of the form host:port") + } + let host = String(url.prefix(upTo: lastNdx)) + let port = Int(url.suffix(from:url.index(lastNdx,offsetBy:1)))! + return try GRPCChannelPool.with(target: .host(host, port: port), + transportSecurity: .plaintext, + eventLoopGroup: eventLoopGroup) + } else { + throw ApacheBeamError.runtimeError("Service URL must be of the form host:port") + } + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift index 31b97f0d318..9eef4113576 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift @@ -49,7 +49,7 @@ public extension Coder { case let .keyvalue(keyCoder, valueCoder): return .kv(try keyCoder.decode(&data), try valueCoder.decode(&data)) case let .iterable(coder): - let length = try data.next(Int32.self) + let length = try data.next(Int32.self).byteSwapped return .array(try (0..<length).map({ _ in try coder.decode(&data) })) case let .windowedvalue(valueCoder, windowCoder): let timestamp = try data.next(Int64.self).byteSwapped &+ Int64(-9223372036854775808) diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift index ceb2649243e..bf1ead4f6d8 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift @@ -155,7 +155,7 @@ struct BundleCoderContainer : CoderContainer { } extension Coder { - static func of(name:String,in container:CoderContainer) -> Coder? { + static func of(name:String,in container:CoderContainer) throws -> Coder { if let baseCoder = container[name] { switch baseCoder.spec.urn { case "beam:coder:bytes:v1": @@ -167,24 +167,24 @@ extension Coder { case "beam:coder:double:v1": return .double case "beam:coder:iterable:v1": - return .iterable(.of(name: baseCoder.componentCoderIds[0], in: container)!) + return .iterable(try .of(name: baseCoder.componentCoderIds[0], in: container)) case "beam:coder:kv:v1": return .keyvalue( - .of(name: baseCoder.componentCoderIds[0],in:container)!, - .of(name: baseCoder.componentCoderIds[1], in:container)!) + try .of(name: baseCoder.componentCoderIds[0],in:container), + try .of(name: baseCoder.componentCoderIds[1], in:container)) case "beam:coder:global_window:v1": return .globalwindow case "beam:coder:windowed_value:v1": return .windowedvalue( - .of(name: baseCoder.componentCoderIds[0],in:container)!, - .of(name: baseCoder.componentCoderIds[1],in:container)!) + try .of(name: baseCoder.componentCoderIds[0],in:container), + try .of(name: baseCoder.componentCoderIds[1],in:container)) case "beam:coder:length_prefix:v1": - return .lengthprefix(.of(name: baseCoder.componentCoderIds[0],in:container)!) + return .lengthprefix(try .of(name: baseCoder.componentCoderIds[0],in:container)) default: return .unknown(baseCoder.spec.urn) } } else { - return nil + throw ApacheBeamError.runtimeError("Unable to location coder \(name) in container.") } } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift index 73c6112d680..ac5d26695b5 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift @@ -22,10 +22,8 @@ public struct ArtifactInfo { } extension ArtifactInfo : ProtoConversion { - var proto: Org_Apache_Beam_Model_Pipeline_V1_ArtifactInformation { - .with { - $0.roleUrn = .beamUrn(role,type:"artifact:type") - $0.typeUrn = .beamUrn(type,type:"artifact:role") - } + func populate(_ proto: inout Org_Apache_Beam_Model_Pipeline_V1_ArtifactInformation) throws { + proto.roleUrn = .beamUrn(role,type:"artifact:type") + proto.typeUrn = .beamUrn(type,type:"artifact:role") } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift index fd09bf454e2..fe5f19d6fa6 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift @@ -42,3 +42,53 @@ public struct Environment { } } +extension Environment : ProtoConversion { + + func populate(_ proto: inout EnvironmentProto) throws { + proto.urn = switch category { + case .docker(_): .beamUrn("docker",type:"env") + case .system: .beamUrn("default",type:"env") + case .process(_, _, _, _): .beamUrn("process",type:"env") + case .external(_): .beamUrn("external",type:"env") + } + proto.capabilities = capabilities + + proto.dependencies = try dependencies.map({ artifact in + try .with { + try artifact.populate(&$0) + } + }) + + if case let .docker(containerImage) = category { + proto.payload = try Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with { + $0.containerImage = containerImage + }.serializedData() + } + + if case let .external(endpoint) = category { + proto.payload = try Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with { + $0.endpoint = try .with { + try endpoint.populate(&$0) + } + }.serializedData() + } + + if case let .process(command,arch,os,env) = category { + proto.payload = try Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with { + $0.arch = arch + $0.command = command + $0.os = os + $0.env = env + }.serializedData() + } + } + + public static func docker(_ imageName: String,capabilities:[String]=[],dependencies:[ArtifactInfo]=[]) -> Environment { + Environment(.docker(imageName),capabilities:capabilities,dependencies:dependencies) + } + + public static func external(_ endpoint: ApiServiceDescriptor,capabilities:[String]=[],dependencies:[ArtifactInfo]=[]) -> Environment { + Environment(.external(endpoint),capabilities:capabilities,dependencies:dependencies) + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift index 717b604deb1..80d23bbcc09 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift @@ -22,7 +22,7 @@ public struct SerializableFnBundleContext { let instruction:String let transform:String let payload:Data - let log:Logger + let log:Logging.Logger } /// SerialiableFn is a protocol for functions that should be parameterized for the pipeline. This is intended as a fairly low level class and users diff --git a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift index 278ccb609cb..237abc8c645 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift @@ -35,7 +35,7 @@ public struct KV<Key,Value> : AnyKeyValue { self.values = [value] } - public init(_ key: Key,_ values: [Value]) { + public init(_ key: Key,values: [Value]) { self.key = key self.values = values } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift index df11ecd3780..4e5ea0bfb0c 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift @@ -78,6 +78,10 @@ public struct AnyPCollectionStream : AsyncSequence { return value as! PCollectionStream<Out> } + public func emit(value element: Any) throws { + try emitClosure(value,element) + } + public func finish() { finishClosure(self.value) } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift index 1d2b3cbe151..38b8f795ac1 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift @@ -52,7 +52,7 @@ public final class PCollectionStream<Of> : AsyncSequence { // a more clever way of doing this, but I don't know what it is. func emit<K,V>(key: K,value: [V],timestamp: Date,window:Window) { - emit(KV(key,value) as! Of,timestamp:timestamp,window:window) + emit(KV(key,values:value) as! Of,timestamp:timestamp,window:window) } func emit<K>(key: K,value: BeamValue,timestamp: Date,window:Window) throws { @@ -63,11 +63,11 @@ public final class PCollectionStream<Of> : AsyncSequence { // won't be much in the way of new scalar values. if case .array(let array) = value { switch array.first { - case .boolean(_):emit(key:key,value:array.map({$0.baseValue as! Bool}),timestamp:timestamp,window:window) - case .bytes(_): emit(key:key,value:array.map({$0.baseValue as! Data}),timestamp:timestamp,window:window) - case .double(_): emit(key:key,value:array.map({$0.baseValue as! Double}),timestamp:timestamp,window:window) - case .integer(_):emit(key:key,value:array.map({$0.baseValue as! Int}),timestamp:timestamp,window:window) - case .string(_): emit(key:key,value:array.map({$0.baseValue as! String}),timestamp:timestamp,window:window) + case .boolean(_):emit(key:key,value:array.map({$0.baseValue! as! Bool}),timestamp:timestamp,window:window) + case .bytes(_): emit(key:key,value:array.map({$0.baseValue! as! Data}),timestamp:timestamp,window:window) + case .double(_): emit(key:key,value:array.map({$0.baseValue! as! Double}),timestamp:timestamp,window:window) + case .integer(_):emit(key:key,value:array.map({$0.baseValue! as! Int}),timestamp:timestamp,window:window) + case .string(_): emit(key:key,value:array.map({$0.baseValue! as! String}),timestamp:timestamp,window:window) default: throw ApacheBeamError.runtimeError("Can't use \(String(describing:array.first)) as a value in a key value pair") } @@ -90,11 +90,11 @@ public final class PCollectionStream<Of> : AsyncSequence { if case let .kv(key,value) = value { // Unwrap the key first switch key { - case let .boolean(v):try emit(key:v,value:value,timestamp:timestamp,window:window) - case let .bytes(v): try emit(key:v,value:value,timestamp:timestamp,window:window) - case let .double(v): try emit(key:v,value:value,timestamp:timestamp,window:window) - case let .integer(v):try emit(key:v,value:value,timestamp:timestamp,window:window) - case let .string(v): try emit(key:v,value:value,timestamp:timestamp,window:window) + case let .boolean(v):try emit(key:v!,value:value,timestamp:timestamp,window:window) + case let .bytes(v): try emit(key:v!,value:value,timestamp:timestamp,window:window) + case let .double(v): try emit(key:v!,value:value,timestamp:timestamp,window:window) + case let .integer(v):try emit(key:v!,value:value,timestamp:timestamp,window:window) + case let .string(v): try emit(key:v!,value:value,timestamp:timestamp,window:window) default: throw ApacheBeamError.runtimeError("Can't use \(value) as a value in a key value pair") } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index ece88ed6799..116da114b67 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -10,15 +10,8 @@ public final class Pipeline { self.content = content } - public func run(loopback:Bool = false) throws { - let job = try context - log.info("\(job.proto)") - - var proto = job.proto - if loopback { - //If we are in loopback mode we want to replace the default environment - } - + public func run(_ runner: PipelineRunner) async throws { + try await runner.run(try self.context) } @@ -127,6 +120,7 @@ public final class Pipeline { .with { $0.uniqueName = uniqueName("c") $0.coderID = coder.name + $0.windowingStrategyID = defaultStrategy.name $0.isBounded = .bounded //TODO: Get this from the coder } } @@ -248,12 +242,10 @@ public final class Pipeline { } } } - - - + proto.rootTransformIds = rootIds } - return PipelineContext(pipeline,defaultEnvironment.name) + return PipelineContext(pipeline,defaultEnvironment.name,collections,fns) } } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift index d944b86a7d6..a173d1850ff 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift @@ -2,10 +2,14 @@ public final class PipelineContext { var proto : PipelineProto let defaultEnvironmentId: String + let collections: [String:AnyPCollection] + let pardoFns: [String:SerializableFn] - init(_ proto:PipelineProto,_ defaultEnvironmentId: String) { + init(_ proto:PipelineProto,_ defaultEnvironmentId: String,_ collections: [String:AnyPCollection],_ fns:[String:SerializableFn]) { self.proto = proto self.defaultEnvironmentId = defaultEnvironmentId + self.collections = collections + self.pardoFns = fns } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift new file mode 100644 index 00000000000..4e5e4f55d73 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift @@ -0,0 +1,3 @@ +public protocol PipelineRunner { + func run(_ context: PipelineContext) async throws +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift b/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift new file mode 100644 index 00000000000..a129508e85f --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift @@ -0,0 +1,6 @@ +public extension Dictionary where Key:Comparable { + /// Return key-value pairs sorted by key. + func sorted() -> [(Key,Value)] { + self.map({ ($0,$1)}).sorted(by: { $0.0 < $1.0 }) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift index 5207748237f..8e9d85d2a7a 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift @@ -2,36 +2,8 @@ extension PipelineProto { mutating func environment(from: Environment) throws -> PipelineComponent { return try environment { _ in - try .with { p in - p.urn = switch from.category { - case .docker(_): .beamUrn("docker",type:"env") - case .system: .beamUrn("default",type:"env") - case .process(_, _, _, _): .beamUrn("process",type:"env") - case .external(_): .beamUrn("external",type:"env") - } - p.capabilities = from.capabilities - p.dependencies = from.dependencies.map({ $0.proto }) - - if case let .docker(containerImage) = from.category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with { - $0.containerImage = containerImage - }.serializedData() - } - - if case let .external(endpoint) = from.category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with { - $0.endpoint = endpoint.proto - }.serializedData() - } - - if case let .process(command,arch,os,env) = from.category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with { - $0.arch = arch - $0.command = command - $0.os = os - $0.env = env - }.serializedData() - } + try .with { + try from.populate(&$0) } } } diff --git a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift index 2ab1cf85c66..d3342b3cffb 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift @@ -19,7 +19,7 @@ protocol ProtoConversion { associatedtype Proto - var proto: Proto { get throws } + func populate(_ proto: inout Proto) throws } diff --git a/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift b/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift new file mode 100644 index 00000000000..67737e0bf1a --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift @@ -0,0 +1,59 @@ +import Logging +import GRPC +import NIOCore + +public struct PortableRunner : PipelineRunner { + let loopback:Bool + let log: Logging.Logger + let host: String + let port: Int + + public init(host:String="localhost",port:Int=8073,loopback:Bool=false) { + self.loopback = loopback + self.log = .init(label: "PortableRunner") + self.host = host + self.port = port + } + + public func run(_ context: PipelineContext) async throws { + var proto = context.proto + if loopback { + //If we are in loopback mode we want to replace the default environment + //with an external environment that points to our local worker server + let worker = try WorkerServer(context.collections,context.pardoFns) + log.info("Running in LOOPBACK mode with a worker server at \(worker.endpoint).") + proto.components.environments[context.defaultEnvironmentId] = try .with { + try Environment.external(worker.endpoint).populate(&$0) + } + } + log.info("\(proto)") + log.info("Connecting to Portable Runner at \(host):\(port).") + let group = PlatformSupport.makeEventLoopGroup(loopCount: 1) + let channel = try GRPCChannelPool.with(target: .host(host, port: port), + transportSecurity: .plaintext, eventLoopGroup: group) + let client = Org_Apache_Beam_Model_JobManagement_V1_JobServiceAsyncClient(channel: channel) + let prepared = try await client.prepare(.with { + $0.pipeline = proto + }) + let job = try await client.run(.with { + $0.preparationID = prepared.preparationID + }) + log.info("Submitted job \(job.jobID)") + var done = false + while !done { + let status = try await client.getState(.with { + $0.jobID = job.jobID + }) + log.info("Job \(job.jobID) status: \(status.state)") + switch status.state { + case .stopped,.failed,.done: + done = true + default: + try await Task.sleep(for: .seconds(5)) + } + } + log.info("Job completed.") + + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift new file mode 100644 index 00000000000..3863b8748f7 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift @@ -0,0 +1,113 @@ +import Logging + +import Foundation + +struct BundleProcessor { + let log: Logging.Logger + + struct Step { + let transformId: String + let fn: SerializableFn + let inputs: [AnyPCollectionStream] + let outputs: [AnyPCollectionStream] + let payload: Data + } + + let steps:[Step] + + init(id:String, + descriptor:Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor, + collections: [String:AnyPCollection], + fns: [String:SerializableFn]) throws { + self.log = Logging.Logger(label: "BundleProcessor(\(descriptor.id))") + + var temp: [Step] = [] + var coders = BundleCoderContainer(bundle:descriptor) + + var streams: [String:AnyPCollectionStream] = [:] + // First make streams for everything in this bundle (maybe I could use the pcollection array for this?) + for (_,transform) in descriptor.transforms { + for id in transform.inputs.values { + if streams[id] == nil { + streams[id] = collections[id]!.anyStream + } + } + for id in transform.outputs.values { + if streams[id] == nil { + streams[id] = collections[id]!.anyStream + } + } + } + + + + for (_,transform) in descriptor.transforms { + let urn = transform.spec.urn + //Map the input and output streams in the correct order + let inputs = transform.inputs.sorted().map { streams[$0.1]! } + let outputs = transform.outputs.sorted().map { streams[$0.1]! } + if urn == "beam:runner:source:v1" { + let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) + let coder = try Coder.of(name: remotePort.coderID, in: coders) + temp.append(Step( + transformId: transform.uniqueName, + fn:Source(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder), + inputs:inputs, + outputs:outputs, + payload:Data() + )) + } else if urn == "beam:runner:sink:v1" { + let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) + let coder = try Coder.of(name: remotePort.coderID, in: coders) + temp.append(Step( + transformId: transform.uniqueName, + fn:Sink(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder), + inputs:inputs, + outputs:outputs, + payload:Data() + )) + + } else if urn == "beam:transform:pardo:v1" { + let pardoPayload = try Org_Apache_Beam_Model_Pipeline_V1_ParDoPayload(serializedData: transform.spec.payload) + if let fn = fns[transform.uniqueName] { + temp.append(Step(transformId: transform.uniqueName, + fn: fn, + inputs: inputs, + outputs: outputs, + payload: pardoPayload.doFn.payload)) + } else { + log.warning("Unable to map \(transform.uniqueName) to a known SerializableFn. Will be skipped during processing.") + } + } else { + log.warning("Unable to map \(urn). Will be skipped during processing.") + } + + } + self.steps = temp + } + + public func process(instruction: String,responder: AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation) async { + _ = await withThrowingTaskGroup(of: (String,String).self) { group in + do { + for step in self.steps { + let context = SerializableFnBundleContext(instruction: instruction, transform: step.transformId, payload: step.payload, log: self.log) + group.addTask { + return try await step.fn.process(context: context, inputs: step.inputs, outputs: step.outputs) + } + } + for try await (instruction,transform) in group { + log.info("Task Completed (\(instruction),\(transform))") + } + responder.yield(.with { + $0.instructionID = instruction + }) + } catch { + responder.yield(.with { + $0.instructionID = instruction + $0.error = "\(error)" + }) + } + } + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift new file mode 100644 index 00000000000..b399c13f1f6 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift @@ -0,0 +1,26 @@ +import Foundation + +final class Sink : SerializableFn { + let client: DataplaneClient + let coder: Coder + + public init(client: DataplaneClient,coder:Coder) { + self.client = client + self.coder = coder + + } + + + func process(context: SerializableFnBundleContext, + inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + let (_,emitter) = await client.makeStream(instruction: context.instruction, transform: context.transform) + for try await element in inputs[0] { + var output = Data() + try coder.encode(element, data: &output) + emitter.yield(.data(output)) + } + emitter.yield(.last(context.instruction, context.transform)) + emitter.finish() + return (context.instruction,context.transform) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift new file mode 100644 index 00000000000..b53ed8794fa --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift @@ -0,0 +1,43 @@ +import Foundation + +/// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given +/// data element might contain more than one coder +final class Source : SerializableFn { + + let client: DataplaneClient + let coder: Coder + + public init(client: DataplaneClient,coder:Coder) { + self.client = client + self.coder = coder + + } + + + func process(context: SerializableFnBundleContext, + inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + let (stream,_) = await client.makeStream(instruction: context.instruction, transform: context.transform) + for await message in stream { + switch message { + case let .data(data): + var d = data + while d.count > 0 { + let value = try coder.decode(&d) + for output in outputs { + try output.emit(value: value) + } + } + case let .last(id, transform): + for output in outputs { + output.finish() + } + await client.finalizeStream(instruction: id, transform: transform) + return (id,transform) + //TODO: Handle timer messages + default: + break + } + } + return (context.instruction,context.transform) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift new file mode 100644 index 00000000000..ac959d3235d --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift @@ -0,0 +1,192 @@ +import Foundation +import GRPC +import Logging + +/// Client for handling the multiplexing and demultiplexing of Dataplane messages +actor DataplaneClient { + + public struct Pair : Hashable { + let id: String + let transform: String + } + + public enum Message { + case data(Data) + case timer(String,Data) + case last(String,String) + case flush + } + struct Multiplex { + let id: String + let transform: String + let message: Message + } + typealias InternalStream = AsyncStream<Multiplex> + typealias Stream = AsyncStream<Message> + + public struct MultiplexContinuation { + let id: String + let transform: String + let base: InternalStream.Continuation + + @discardableResult + func yield(_ value: Message) -> InternalStream.Continuation.YieldResult { + base.yield(Multiplex(id: id, transform: transform, message: value)) + } + + func finish() { + // Does nothing + } + + + + } + + + private let id: String + private let log: Logging.Logger + private let multiplex: (InternalStream,InternalStream.Continuation) + private var streams: [Pair:(Stream,Stream.Continuation,MultiplexContinuation)] = [:] + private let flush: Int + + public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=100) throws { + self.id = id + self.log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))") + self.multiplex = AsyncStream.makeStream(of:Multiplex.self) + self.flush = flush + let client = Org_Apache_Beam_Model_FnExecution_V1_BeamFnDataAsyncClient(channel:try GRPCChannelPool.with(endpoint: endpoint, eventLoopGroup: PlatformSupport.makeEventLoopGroup(loopCount: 1)),defaultCallOptions: CallOptions(customMetadata:["worker_id":id])) + let stream = client.makeDataCall() + + // Mux task + Task { + log.info("Initiating data plane multiplexing.") + + let input = multiplex.0 + + var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() + for try await element in input { + var shouldFlush: Bool = false + switch element.message { + + case .data(let payload): + elements.data.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.data = payload + }) + case let .timer(family, payload): + elements.timers.append(.with { + $0.instructionID = element.id + $0.transformID = element.transform + $0.timerFamilyID = family + $0.timers = payload + }) + case let .last(id, transform): + elements.data.append(.with { + $0.instructionID = id + $0.transformID = transform + $0.isLast = true + }) + shouldFlush = true + case .flush: + shouldFlush = true + } + if shouldFlush || elements.data.count + elements.timers.count >= flush { + do { + try await stream.requestStream.send(elements) + } catch { + log.error("Unable to multiplex elements onto data plane: \(error)") + } + elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() + shouldFlush = false + } + } + } + + // Demux task + Task { + log.info("Initiating data plane demultiplexing.") + do { + + for try await elements in stream.responseStream { + var last: [Pair:Message] = [:] // Split out last calls so they are always at the end + var messages: [Pair:[Message]] = [:] + + for element in elements.data { + let key = Pair(id: element.instructionID, transform: element.transformID) + //Drop zero-length elements + if element.data.count > 0 { + messages[key,default:[]].append(.data(element.data)) + } + if element.isLast { + last[key] = .last(element.instructionID, element.transformID) + } + } + + for element in elements.timers { + let key = Pair(id:element.instructionID,transform:element.transformID) + if element.timers.count > 0 { + messages[key,default:[]].append(.timer(element.timerFamilyID, element.timers)) + } + if element.isLast { + last[key] = .last(element.instructionID, element.transformID) + } + } + + // Send the messages to registered sources + for (key,value) in messages { + let output = await self.makeStream(key:key).1 + for v in value { + output.yield(v) + } + } + // Send any last messages + for (key,value) in last { + let output = await self.makeStream(key: key).1 + output.yield(value) + } + } + } catch { + log.error("Lost data plane connection.") + } + + } + } + + /// Returns or creates a stream for a particular instruction,transform pair with both the multiplex and demultiplex continuations + internal func makeStream(key:Pair) -> (Stream,Stream.Continuation,MultiplexContinuation) { + if let existing = streams[key] { + return existing + } + let baseStream = AsyncStream.makeStream(of:Message.self) + let stream = (baseStream.0,baseStream.1, + MultiplexContinuation(id: key.id, transform: key.transform, base: multiplex.1)) + streams[key] = stream + return stream + } + + /// Returns or creates a stream for a particular instruction,transform pair with the multiplex continuation but not the demultiplex + /// which mirrors the response from AsyncStream.makeStream + public func makeStream(instruction:String,transform:String) -> (Stream,MultiplexContinuation) { + let key = Pair(id:instruction,transform: transform) + let (stream,_,continuation) = makeStream(key:key) + return (stream,continuation) + } + + func finalizeStream(instruction:String,transform:String) { + //TODO: Implement finalization. + } + + + + private static var dataplanes: [ApiServiceDescriptor:DataplaneClient] = [:] + public static func client(for endpoint:ApiServiceDescriptor,worker id: String) throws -> DataplaneClient { + if let client = dataplanes[endpoint] { + return client + } else { + let client = try DataplaneClient(id: id, endpoint: endpoint) + dataplanes[endpoint] = client + return client + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift index 4bbbd44b889..a9c2effee6b 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift @@ -1,5 +1,75 @@ +import GRPC +import NIOCore +import Logging + actor Worker { - public init(id:String) { + private let id: String + private let collections: [String:AnyPCollection] + private let fns: [String:SerializableFn] + private let control: ApiServiceDescriptor + private let remoteLog: ApiServiceDescriptor + + private let log: Logging.Logger + + public init(id:String,control:ApiServiceDescriptor,log:ApiServiceDescriptor,collections:[String:AnyPCollection],functions: [String:SerializableFn]) { + self.id = id + self.collections = collections + self.fns = functions + self.control = control + self.remoteLog = log + + self.log = Logging.Logger(label: "Worker(\(id))") + } + + + + public func start() throws { + let group = PlatformSupport.makeEventLoopGroup(loopCount: 1) + let client = Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel: try GRPCChannelPool.with(endpoint: control, eventLoopGroup: group)) + let (responses,responder) = AsyncStream.makeStream(of:Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse.self) + let options = CallOptions(customMetadata: ["worker_id":id]) + let control = client.makeControlCall(callOptions: options) + + + //Start the response task. This will continue until a yield call is sent from responder + Task { + for await r in responses { + try await control.requestStream.send(r) + } + } + //Start the actual work task + Task { + log.info("Waiting for control plane instructions.") + var processors: [String:BundleProcessor] = [:] + + func processor(for bundle: String) async throws -> BundleProcessor { + if let processor = processors[bundle] { + return processor + } + let descriptor = try await client.getProcessBundleDescriptor(.with { $0.processBundleDescriptorID = bundle }) + let processor = try BundleProcessor(id: id, descriptor: descriptor, collections: collections, fns: fns) + processors[bundle] = processor + return processor + } + + + //This looks a little bit reversed from the usual because response don't need an initiating call + for try await instruction in control.responseStream { + switch instruction.request { + case .processBundle(let pbr): + try await processor(for:pbr.processBundleDescriptorID) + .process(instruction: instruction.instructionID,responder:responder) + break + default: + log.warning("Ignoring instruction \(instruction.instructionID). Not yet implemented.") + log.warning("\(instruction)") + responder.yield(.with { + $0.instructionID = instruction.instructionID + }) + } + } + log.info("Control plane connection has closed.") + } } } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift index e45c3f7eacb..40d7768903f 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift @@ -23,9 +23,12 @@ actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker private let log = Logging.Logger(label: "Worker") private var workers: [String:Worker] = [:] + private let collections: [String:AnyPCollection] private let functions: [String:SerializableFn] - init(_ functions: [String:SerializableFn]) { + + init(_ collections: [String:AnyPCollection],_ functions: [String:SerializableFn]) throws { + self.collections = collections self.functions = functions } @@ -35,15 +38,19 @@ actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker func startWorker(request: Org_Apache_Beam_Model_FnExecution_V1_StartWorkerRequest, context: GRPC.GRPCAsyncServerCallContext) async throws -> Org_Apache_Beam_Model_FnExecution_V1_StartWorkerResponse { log.info("Got request to start worker \(request.workerID)") do { - if let worker = workers[request.workerID] { + if workers[request.workerID] != nil { log.info("Worker \(request.workerID) is already running.") return .with { _ in } } else { - workers[request.workerID] = try Worker(id: request.workerID) - } - return .with { _ in - + let worker = Worker(id: request.workerID, + control: ApiServiceDescriptor(proto:request.controlEndpoint), + log:ApiServiceDescriptor(proto: request.loggingEndpoint), + collections: collections, + functions: functions) + try await worker.start() + workers[request.workerID] = worker } + return .with { _ in } } catch { log.error("Unable to start worker \(request.workerID): \(error)") return .with { @@ -55,6 +62,22 @@ actor WorkerProvider : Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker func stopWorker(request: Org_Apache_Beam_Model_FnExecution_V1_StopWorkerRequest, context: GRPC.GRPCAsyncServerCallContext) async throws -> Org_Apache_Beam_Model_FnExecution_V1_StopWorkerResponse { return .with { _ in } } +} + +public struct WorkerServer { + private let server: Server + + public let endpoint: ApiServiceDescriptor - + public init(_ collections: [String:AnyPCollection],_ fns: [String:SerializableFn],host:String="localhost",port:Int=0) throws { + server = try .insecure(group: PlatformSupport.makeEventLoopGroup(loopCount:1)) + .withServiceProviders([WorkerProvider(collections,fns)]) + .bind(host:host,port:port) + .wait() + if let port = server.channel.localAddress?.port { + endpoint = ApiServiceDescriptor(host: host, port: port) + } else { + throw ApacheBeamError.runtimeError("Unable to get server local address port.") + } + } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index 54c3ae4586e..b0e4d21edcb 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -16,6 +16,8 @@ public extension PCollection { /// Convenience logging mappers public extension PCollection { + + @discardableResult func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<Of> where Of == String { pardo(name,prefix) { prefix,input,output in for await element in input { @@ -25,6 +27,7 @@ public extension PCollection { } } + @discardableResult func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<KV<K,V>> where Of == KV<K,V> { pardo(name,prefix) { prefix,input,output in for await element in input { diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt new file mode 100644 index 00000000000..e177e396e79 --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt @@ -0,0 +1,4 @@ +This file contains some text where we would like to count things +This is another line +This is a third line +Wow there are definitely some lines here. diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt new file mode 100644 index 00000000000..14bda52d20e --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt @@ -0,0 +1,3 @@ +Another file for testing. +This one contains different lines from the first file.. +The fact that things are getting called is amazing though! diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index 625ba1eafbd..d3743cfbc56 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -33,8 +33,8 @@ final class IntegrationTests: XCTestCase { override func tearDownWithError() throws { } - func testPortableWordcount() throws { - try Pipeline { pipeline in + func testPortableWordcount() async throws { + try await Pipeline { pipeline in let (contents,errors) = pipeline .create(["file1.txt","file2.txt","missing.txt"]) .pardo { filenames,output,errors in @@ -61,16 +61,17 @@ final class IntegrationTests: XCTestCase { .flatMap({ $0.components(separatedBy: .whitespaces) }) .groupBy({ ($0,1) }) .sum() + .log(prefix:"INTERMEDIATE OUTPUT") let normalizedCounts = baseCount.groupBy { ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters), $0.value ?? 1) }.sum() - _ = normalizedCounts.log(prefix:"COUNT OUTPUT") - _ = errors.log(prefix:"ERROR OUTPUT") + normalizedCounts.log(prefix:"COUNT OUTPUT") + errors.log(prefix:"ERROR OUTPUT") - }.run() + }.run(PortableRunner(loopback:true)) }
