This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d22e84f5bd2b5be07a49c5d3c02b1859245c55e7
Author: Byron Ellis <[email protected]>
AuthorDate: Thu Aug 10 10:30:38 2023 -0700

    Adding more of the Swift SDK implementation
---
 .../Sources/ApacheBeam/Core/Environment.swift      | 44 ++++++++++-
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift | 63 ++++++++++++++++
 .../Sources/ApacheBeam/Core/Fn/SerialiableFn.swift | 38 ++++++++++
 .../Core/PCollection/AnyPCollectionStream.swift    | 86 ++++++++++++++++++++++
 .../Core/PCollection/PCollectionStream.swift       | 56 ++++++++++++++
 .../ApacheBeam/Internal/ProtoConversion.swift      |  2 +-
 .../ApacheBeam/Internal/Server+Endpoint.swift      | 24 ++++++
 .../Sources/ApacheBeam/Runtime/Worker/Worker.swift |  5 ++
 .../ApacheBeam/Runtime/Worker/WorkerProvider.swift | 60 +++++++++++++++
 9 files changed, 375 insertions(+), 3 deletions(-)

diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
index 9e45c4575c4..f3b0fb85cbc 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
@@ -21,15 +21,24 @@
 public struct Environment {
     
     public enum Category {
+        /// Default environment type. "default" is a reserved word so we use 
"system" here
         case system
-        case process(String,String,String)
+        /// Process. command, arch, os, environment
+        case process(String,String,String,[String:String])
+        /// Docker container image
         case docker(String)
+        /// External service using an api descriptor
         case external(ApiServiceDescriptor)
     }
     
     let category: Category
+    let capabilities: [String]
+    let dependencies: [ArtifactInfo]
+    
     public init(_ category: Category = .system,capabilities:[String] = 
[],dependencies:[ArtifactInfo]) {
         self.category = category
+        self.capabilities = capabilities
+        self.dependencies = dependencies
     }
 }
 
@@ -38,7 +47,38 @@ extension Environment : ProtoConversion {
     typealias Proto = EnvironmentProto
     
     var proto: EnvironmentProto {
-        .with { p in
+        get throws {
+            return try .with { p in
+                p.urn = switch category {
+                case .docker(_): .beamUrn("docker",type:"env")
+                case .system: .beamUrn("default",type:"env")
+                case .process(_, _, _, _): .beamUrn("process",type:"env")
+                case .external(_): .beamUrn("external",type:"env")
+                }
+                p.capabilities = capabilities
+                p.dependencies = dependencies.map({ $0.proto })
+                
+                if case let .docker(containerImage) = category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with {
+                        $0.containerImage = containerImage
+                    }.serializedData()
+                }
+                
+                if case let .external(endpoint) = category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with {
+                        $0.endpoint = endpoint.proto
+                    }.serializedData()
+                }
+                
+                if case let .process(command,arch,os,env) = category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with {
+                        $0.arch = arch
+                        $0.command = command
+                        $0.os = os
+                        $0.env = env
+                    }.serializedData()
+                }
+            }
         }
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
new file mode 100644
index 00000000000..82fd4708046
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoneFn.swift
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+public final class DoneFn<Of> : SerializableFn {
+
+    private let fn: (PCollectionStream<Of>) async throws -> Void
+    public init(_ fn: @Sendable @escaping (PCollectionStream<Of>) async throws 
-> Void) {
+        self.fn = fn
+    }
+    
+    
+    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
+        try await fn(inputs[0].stream())
+        for output in outputs {
+            output.finish()
+        }
+        return (context.instruction,context.transform)
+    }
+}
+
+public final class ParameterizedDoneFn<Of,Param:Codable> : SerializableFn {
+    
+    
+    private let param: Param
+    private let fn: (Param,PCollectionStream<Of>) async throws -> Void
+    
+    public init(_ param: Param,_ fn: @Sendable @escaping 
(Param,PCollectionStream<Of>) async throws -> Void){
+        self.param = param
+        self.fn = fn
+    }
+    
+    public var payload: Data {
+        get throws {
+            try JSONEncoder().encode(param)
+        }
+    }
+    
+    public func process(context: SerializableFnBundleContext, inputs: 
[AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> 
(String, String) {
+        try await fn(try JSONDecoder().decode(Param.self, from: 
context.payload),inputs[0].stream())
+        for output in outputs {
+            output.finish()
+        }
+        return (context.instruction,context.transform)
+    }
+
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
new file mode 100644
index 00000000000..3a30066dc4b
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerialiableFn.swift
@@ -0,0 +1,38 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+import Foundation
+import Logging
+
+public struct SerializableFnBundleContext {
+    let instruction:String
+    let transform:String
+    let payload:Data
+    let log:Logger
+}
+
+/// SerialiableFn is a protocol for functions that should be parameterized for 
the pipeline
+public protocol SerializableFn {
+    var  payload: Data { get throws }
+    func 
process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream])
 async throws -> (String,String)
+}
+
+/// Provide some defaults where our function doesn't have any payload
+public extension SerializableFn {
+    var payload : Data { Data() }
+}
+
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
new file mode 100644
index 00000000000..da15c740771
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -0,0 +1,86 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+public struct AnyPCollectionStream : AsyncSequence {
+    public typealias Element = Iterator.Element
+    public typealias AsyncIterator = Iterator
+    
+    public struct Iterator : AsyncIteratorProtocol {
+        public typealias Element = (Any,Date,Window)
+        
+        let nextClosure: () async throws -> Element?
+        public mutating func next() async throws -> Element? {
+            return try await nextClosure()
+        }
+    }
+    
+    let value: Any
+    let nextGenerator: (Any) -> (() async throws -> Iterator.Element?)
+    let emitClosure: (Any,Any) -> Void
+    let finishClosure: (Any) -> Void
+    
+    public func makeAsyncIterator() -> Iterator {
+        return Iterator(nextClosure: nextGenerator(value))
+    }
+
+    public init(_ value: AnyPCollectionStream) {
+        self = value
+    }
+
+    public init<Of>(_ value: PCollectionStream<Of>) {
+        self.value = value
+
+        self.emitClosure = {
+            let stream = ($0 as! PCollectionStream<Of>)
+            if let beamValue = $1 as? BeamValue {
+                stream.emit(beamValue)
+            } else if let element = $1 as? Element {
+                stream.emit((element.0 as! Of,element.1,element.2))
+            } else if let element = $1 as? PCollectionStream<Of>.Element {
+                stream.emit(element)
+            }
+        }
+        
+        self.finishClosure = {
+            ($0 as! PCollectionStream<Of>).finish()
+        }
+        
+        self.nextGenerator = {
+            var iterator = ($0 as! PCollectionStream<Of>).makeAsyncIterator()
+            return {
+                if let element = await iterator.next() {
+                    return (element.0 as Any,element.1,element.2)
+                } else {
+                    return nil
+                }
+            }
+        }
+    }
+    
+    public func stream<Out>() -> PCollectionStream<Out> {
+        return value as! PCollectionStream<Out>
+    }
+    
+    public func finish() {
+        finishClosure(self.value)
+    }
+    
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
new file mode 100644
index 00000000000..d54e2638230
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -0,0 +1,56 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import Foundation
+
+/// The worker side realization of a PCollection that supports reading and 
writing
+public final class PCollectionStream<Of> : AsyncSequence {
+    public typealias Element = (Of,Date,Window)
+    
+    private let stream: AsyncStream<Element>
+    private let emitter: AsyncStream<Element>.Continuation
+    
+    public init() {
+        //Construct a stream, capturing the emit continuation
+        var tmp: AsyncStream<Element>.Continuation?
+        self.stream = AsyncStream<Element> { tmp = $0 }
+        self.emitter = tmp!
+    }
+    
+    public func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
+        return stream.makeAsyncIterator()
+    }
+    
+    public func finish() {
+        emitter.finish()
+    }
+    
+
+    public func emit(_ value: Element) {
+        emitter.yield(value)
+    }
+    
+    public func emit(_ value: Of,timestamp: Date = .now,window:Window = 
.global) {
+        emit((value,timestamp,window))
+    }
+    
+    public func emit(_ value: BeamValue) {
+        
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
index 2a50e950386..2ab1cf85c66 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
@@ -19,7 +19,7 @@
 protocol ProtoConversion {
     associatedtype Proto
     
-    var proto: Proto { get }
+    var proto: Proto { get throws }
 }
 
 
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift
new file mode 100644
index 00000000000..f32da24fbe1
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Server+Endpoint.swift
@@ -0,0 +1,24 @@
+//
+//  File.swift
+//  
+//
+//  Created by Byron Ellis on 8/9/23.
+//
+
+import GRPC
+
+extension Server {
+    var endpoint: ApiServiceDescriptor {
+        get throws {
+            let address = self.channel.localAddress!
+            if let pathname = address.pathname {
+                return ApiServiceDescriptor(unixAddress: pathname)
+            }
+            if let host = address.ipAddress,let port = address.port {
+                return ApiServiceDescriptor(host: host, port: port)
+            }
+            throw ApacheBeamError.runtimeError("Can't determine endpoint for 
\(address)")
+        }
+    }
+}
+
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
new file mode 100644
index 00000000000..4bbbd44b889
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -0,0 +1,5 @@
+actor Worker {
+    public init(id:String) {
+        
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
new file mode 100644
index 00000000000..fb5e2e52225
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import GRPC
+import Logging
+
+actor WorkerProvider : 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorkerPoolAsyncProvider {
+
+    private let log = Logger(label:"WorkerProvider")
+    private var workers: [String:Worker] = [:]
+
+    private let functions: [String:SerializableFn]
+    
+    init(_ functions: [String:SerializableFn]) {
+        self.functions = functions
+    }
+    
+    
+    
+    
+    func startWorker(request: 
Org_Apache_Beam_Model_FnExecution_V1_StartWorkerRequest, context: 
GRPC.GRPCAsyncServerCallContext) async throws -> 
Org_Apache_Beam_Model_FnExecution_V1_StartWorkerResponse {
+        log.info("Got request to start worker \(request.workerID)")
+        do {
+            if let worker = workers[request.workerID] {
+                log.info("Worker \(request.workerID) is already running.")
+                return .with { _ in }
+            } else {
+                workers[request.workerID] = try Worker(id: request.workerID)
+            }
+            return .with { _ in
+                
+            }
+        } catch {
+            log.error("Unable to start worker \(request.workerID): \(error)")
+            return .with {
+                $0.error = "\(error)"
+            }
+        }
+    }
+    
+    func stopWorker(request: 
Org_Apache_Beam_Model_FnExecution_V1_StopWorkerRequest, context: 
GRPC.GRPCAsyncServerCallContext) async throws -> 
Org_Apache_Beam_Model_FnExecution_V1_StopWorkerResponse {
+        return .with { _ in }
+    }
+    
+    
+}

Reply via email to