This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 534b3e4c2c9b50c9ff9e1d2f7c096ec55cdda3a3
Author: Byron Ellis <[email protected]>
AuthorDate: Tue Aug 15 10:07:59 2023 -0700

    Moving some stuff around to make the proto construction code easier to read
---
 sdks/swift/README.md                               |  19 ++
 .../Sources/ApacheBeam/Core/Environment.swift      |  40 ----
 .../ApacheBeam/Core/Fn/SerializableFn.swift        |   2 +
 sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift  |   6 +-
 .../Core/PCollection/AnyPCollection.swift          |   9 +
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        | 245 +++++++++++++++++++++
 .../ApacheBeam/Core/Pipeline/PipelineContext.swift |  11 +
 .../Core/Pipeline/PipelineTransform.swift          |   2 +-
 sdks/swift/Sources/ApacheBeam/Core/Windowing.swift |   4 -
 .../ApacheBeam/Internal/Array+Helpers.swift        |   5 +
 .../ApacheBeam/Internal/PipelineProto+Coder.swift  |  28 +++
 .../Internal/PipelineProto+Environment.swift       |  38 ++++
 .../Sources/ApacheBeam/Transforms/Basic.swift      |  12 +
 .../Pipeline/IntegrationTests.swift                |   9 +-
 14 files changed, 381 insertions(+), 49 deletions(-)

diff --git a/sdks/swift/README.md b/sdks/swift/README.md
new file mode 100644
index 00000000000..810ccae8b24
--- /dev/null
+++ b/sdks/swift/README.md
@@ -0,0 +1,19 @@
+# Swift SDK for Beam
+
+The Swift SDK for Beam is a "portable" Beam implementation written in native 
Swift. 
+
+## Usage
+
+To use the Swift SDK for Beam you should add it to your own executable package 
as a dependency:
+```
+let package = Package(
+    dependencies:[
+        .package(url:"https://github.com/apache/beam/sdks/swift, from: 
"2.51.0")
+    ],
+    targets:[
+        // targets
+    ]
+)
+```
+
+
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
index f3b0fb85cbc..fd09bf454e2 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
@@ -42,43 +42,3 @@ public struct Environment {
     }
 }
 
-
-extension Environment : ProtoConversion {
-    typealias Proto = EnvironmentProto
-    
-    var proto: EnvironmentProto {
-        get throws {
-            return try .with { p in
-                p.urn = switch category {
-                case .docker(_): .beamUrn("docker",type:"env")
-                case .system: .beamUrn("default",type:"env")
-                case .process(_, _, _, _): .beamUrn("process",type:"env")
-                case .external(_): .beamUrn("external",type:"env")
-                }
-                p.capabilities = capabilities
-                p.dependencies = dependencies.map({ $0.proto })
-                
-                if case let .docker(containerImage) = category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with {
-                        $0.containerImage = containerImage
-                    }.serializedData()
-                }
-                
-                if case let .external(endpoint) = category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with {
-                        $0.endpoint = endpoint.proto
-                    }.serializedData()
-                }
-                
-                if case let .process(command,arch,os,env) = category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with {
-                        $0.arch = arch
-                        $0.command = command
-                        $0.os = os
-                        $0.env = env
-                    }.serializedData()
-                }
-            }
-        }
-    }
-}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index babf64dfe31..717b604deb1 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -28,11 +28,13 @@ public struct SerializableFnBundleContext {
 /// SerialiableFn is a protocol for functions that should be parameterized for 
the pipeline. This is intended as a fairly low level class and users
 /// should interact with the apply() functions defined in the transform 
section or implement the DoFn protocol which is then wrapped
 public protocol SerializableFn {
+    var  urn: String { get }
     var  payload: Data { get throws }
     func 
process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream])
 async throws -> (String,String)
 }
 
 /// Provide some defaults where our function doesn't have any payload
 public extension SerializableFn {
+    var urn: String { .beamUrn("dofn",type:"swift:transform") }
     var payload : Data { Data() }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift 
b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
index f8cd8f735c0..278ccb609cb 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
@@ -59,4 +59,8 @@ public struct KV<Key,Value> : AnyKeyValue {
     public var anyValue: Any? { value }
 }
 
-
+extension KV : Beamable {
+    public static var coder: Coder {
+        return .keyvalue(.of(type: Key.self)!, .of(type: Value.self)!)
+    }
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 8f23dccb954..9d2aa8e8c9f 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -49,3 +49,12 @@ public struct AnyPCollection : PCollectionProtocol {
     }
     
 }
+
+extension AnyPCollection : Hashable {
+    public func hash(into hasher: inout Hasher) {
+        hasher.combine(ObjectIdentifier(self.collection as AnyObject))
+    }
+    public static func ==(lhs: AnyPCollection,rhs:AnyPCollection) -> Bool {
+        return ObjectIdentifier(lhs.collection as AnyObject) == 
ObjectIdentifier(rhs.collection as AnyObject)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index f918a620ea1..ece88ed6799 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -10,5 +10,250 @@ public final class Pipeline {
         self.content = content
     }
     
+    public func run(loopback:Bool = false) throws {
+        let job = try context
+        log.info("\(job.proto)")
+        
+        var proto = job.proto
+        if loopback {
+            //If we are in loopback mode we want to replace the default 
environment
+        }
+        
+    }
+    
+    
+    /// For managing the pipeline items to visit
+    enum Visit {
+        case transform([PipelineComponent],PipelineTransform)
+        case collection(AnyPCollection)
+    }
     
+    var context: PipelineContext {
+        get throws {
+            // Grab the pipeline content using an new root
+            var root = PCollection<Never>(coder:.unknown(.coderUrn("never")))
+            _ = content(&root)
+            
+            // These get passed to the pipeline context
+            var collections: [String:AnyPCollection] = [:]
+            var fns: [String:SerializableFn] = [:]
+            var coders: [String:Coder] = [:]
+            var counter: Int = 1
+            
+            // These caches are just used internally
+            var collectionCache: [AnyPCollection:PipelineComponent] = [:]
+            var coderCache: [Coder:PipelineComponent] = [:]
+            var rootIds: [String] = []
+            
+            var defaultEnvironment: PipelineComponent = .none
+            
+            //TODO: Support for composite PTransforms
+            let pipeline: PipelineProto = try .with { proto in
+                
+                func uniqueName(_ prefix: String = "id") -> String {
+                    let output = "\(prefix)\(counter)"
+                    counter = counter + 1
+                    return output
+                }
+                
+                
+                
+                /// We need to define this inside the with to prevent 
concurrent access errors.
+                func coder(from:Coder) -> PipelineComponent {
+                    if let cached = coderCache[from] {
+                        return cached
+                    }
+                    let componentCoders:[String] = switch from {
+                    case let .keyvalue(keyCoder, valueCoder):
+                        [coder(from:keyCoder).name,coder(from:valueCoder).name]
+                    case let .iterable(valueCoder):
+                        [coder(from:valueCoder).name]
+                    case let .lengthprefix(valueCoder):
+                        [coder(from:valueCoder).name]
+                    case let .windowedvalue(valueCoder, windowCoder):
+                        
[coder(from:valueCoder).name,coder(from:windowCoder).name]
+                    default:
+                        []
+                    }
+                    let baseCoder = proto.coder { _ in
+                            .with {
+                                $0.spec = .with {
+                                    $0.urn = from.urn
+                                    if case .custom(let data) = from {
+                                        $0.payload = data
+                                    }
+                                }
+                                $0.componentCoderIds = componentCoders
+                            }
+                    }
+                    coderCache[from] = baseCoder
+                    coders[baseCoder.name] = from
+                    return baseCoder
+                }
+                
+                /// Define the default environment for this pipeline
+                defaultEnvironment = try 
proto.environment(from:.init(.docker("swift:image"),
+                                                                      
capabilities:Coder.capabilities,
+                                                                      
dependencies:[]))
+                
+                /// Define the default strategy
+                let globalWindow = coder(from:.globalwindow)
+                let defaultStrategy = proto.windowingStrategy { _ in
+                        .with {
+                            $0.windowCoderID = globalWindow.name
+                            $0.windowFn = .with {
+                                $0.urn = 
.beamUrn("global_windows",type:"window_fn")
+                            }
+                            $0.mergeStatus = .nonMerging
+                            $0.trigger = .with {
+                                $0.default = .init()
+                            }
+                            $0.accumulationMode = .discarding
+                            $0.outputTime = .endOfWindow
+                            $0.closingBehavior = .emitIfNonempty
+                            $0.onTimeBehavior = .fireIfNonempty
+                            $0.environmentID = defaultEnvironment.name
+                        }
+                }
+                
+                
+                /// As above we define this within the "with" to prevent 
concurrent access errors.
+                func collection(from collection:AnyPCollection) -> 
PipelineComponent {
+                    if let cached = collectionCache[collection] {
+                        return cached
+                    }
+                    let coder = coder(from:collection.coder)
+                    let output = proto.collection { _ in
+                            .with {
+                                $0.uniqueName = uniqueName("c")
+                                $0.coderID = coder.name
+                                $0.isBounded = .bounded //TODO: Get this from 
the coder
+                            }
+                    }
+                    collectionCache[collection] = output
+                    collections[output.name] = collection
+                    return output
+                }
+                
+                func transform(name:String = "",_ fn: @escaping 
(String,String) throws -> PTransformProto) throws -> PipelineComponent {
+                    return try proto.transform { ref in
+                        return try fn(ref,name.count > 0 ? 
name+uniqueName(".t") : uniqueName("t"))
+                    }
+                }
+                
+                var toVisit: [Visit] = root.consumers.map({ .transform([], 
$0)})
+                var visited = Set<AnyPCollection>() // Cycle detection, etc
+                
+                while toVisit.count > 0 {
+                    let item = toVisit.removeFirst()
+                    if case .transform(let parents, let pipelineTransform) = 
item {
+                        let inputs = parents.enumerated().map({ 
("\($0)","\($1.name)")}).dict()
+                        switch pipelineTransform {
+                            
+                        case let .pardo(n, fn, o):
+                            let outputs = o.enumerated().map {
+                                ("\($0)",collection(from: $1).name)
+                            }.dict()
+                            let p = try transform(name:n) { _,name in
+                                    try .with {
+                                        $0.uniqueName = name
+                                        $0.inputs = inputs
+                                        $0.outputs = outputs
+                                        $0.spec = try .with {
+                                            $0.urn = .transformUrn("pardo")
+                                            $0.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ParDoPayload.with {
+                                                $0.doFn = try .with {
+                                                    $0.urn = fn.urn
+                                                    $0.payload = try fn.payload
+
+                                                }
+                                            }.serializedData()
+                                        }
+                                        $0.environmentID = 
defaultEnvironment.name
+                                    }
+                            }
+                            rootIds.append(p.name) //TODO: Composite transform 
handling
+                            fns[p.transform!.uniqueName] = fn
+                            toVisit.append(contentsOf: o.map { .collection($0) 
})
+                        case .impulse(let o):
+                            let outputs = [o].enumerated().map {
+                                ("\($0)",collection(from: $1).name)
+                            }.dict()
+                            let p = try transform { _,name in
+                                    .with {
+                                        $0.uniqueName = name
+                                        $0.outputs = outputs
+                                        $0.spec = .with {
+                                            $0.urn = .transformUrn("impulse")
+                                        }
+                                        $0.environmentID = 
defaultEnvironment.name
+                                    }
+                            }
+                            rootIds.append(p.name)
+                            toVisit.append(.collection(o))
+                        case .flatten(_, _):
+                            throw ApacheBeamError.runtimeError("flatten not 
implemented yet")
+                        case .groupByKey(let o):
+                            let outputs = [o].enumerated().map {
+                                ("\($0)",collection(from: $1).name)
+                            }.dict()
+                            let p = try transform { _,name in
+                                    .with {
+                                        $0.uniqueName = name
+                                        $0.inputs = inputs
+                                        $0.outputs = outputs
+                                        $0.spec = .with {
+                                            $0.urn = 
.transformUrn("group_by_key")
+                                        }
+                                        $0.environmentID = 
defaultEnvironment.name
+                                    }
+                            }
+                            rootIds.append(p.name)
+                            toVisit.append(.collection(o))
+                        case let .custom(urn,payload,env,o):
+                            let outputs = o.enumerated().map {
+                                ("\($0)",collection(from: $1).name)
+                            }.dict()
+                            let environment = if let e = env {
+                                try proto.environment(from: e)
+                            } else {
+                                defaultEnvironment
+                            }
+                            let p = try transform { _,name in
+                                    .with {
+                                        $0.uniqueName = name
+                                        $0.inputs = inputs
+                                        $0.outputs = outputs
+                                        $0.spec = .with {
+                                            $0.urn = urn
+                                            $0.payload = payload
+                                        }
+                                        $0.environmentID = environment.name
+                                    }
+                            }
+                            rootIds.append(p.name)
+                            toVisit.append(contentsOf:o.map { .collection($0) 
})
+                        case .composite(_):
+                            throw ApacheBeamError.runtimeError("Composite 
transforms are not yet implemented")
+                        }
+                    } else if case .collection(let anyPCollection) = item {
+                        if visited.contains(anyPCollection) {
+                            throw ApacheBeamError.runtimeError("Pipeline 
definition contains a cycle.")
+                        }
+                        visited.insert(anyPCollection)
+                        //TODO: Remove this to see if we can recreate the 
error I was seeing earlier for robertwb
+                        if anyPCollection.consumers.count > 0 {
+                            let me = collection(from:anyPCollection)
+                            toVisit.append(contentsOf: 
anyPCollection.consumers.map({ .transform([me], $0)}))
+                        }
+                    }
+                }
+
+                
+                
+                
+            }
+            return PipelineContext(pipeline,defaultEnvironment.name)
+        }
+    }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
new file mode 100644
index 00000000000..d944b86a7d6
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
@@ -0,0 +1,11 @@
+public final class PipelineContext {
+
+    var proto : PipelineProto
+    let defaultEnvironmentId: String
+
+    init(_ proto:PipelineProto,_ defaultEnvironmentId: String) {
+        self.proto = proto
+        self.defaultEnvironmentId = defaultEnvironmentId
+    }
+    
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
index dea7daab738..7a54ecbc685 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -8,7 +8,7 @@ public enum PipelineTransform {
     case impulse(AnyPCollection)
     case flatten([AnyPCollection],AnyPCollection)
     case groupByKey(AnyPCollection)
-    case custom(String,Data,[AnyPCollection])
+    case custom(String,Data,Environment?,[AnyPCollection])
     case composite(AnyPTransform)
 }
 
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift
index 5727b6c3741..aeb71a39000 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift
@@ -35,10 +35,6 @@ public enum Window {
     }
 }
 
-public enum WindowingStrategy {
-    
-}
-
 public enum Timing : String {
     case early = "EARLY"
     case onTime = "ON_TIME"
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift
new file mode 100644
index 00000000000..61c788c0387
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift
@@ -0,0 +1,5 @@
+extension Array where Element == (String,String) {
+    func dict() -> [String:String] {
+        reduce(into:[:],{ $0[$1.0] = $1.1} )
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift
new file mode 100644
index 00000000000..2ad5c73cfdb
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift
@@ -0,0 +1,28 @@
+extension PipelineProto {
+    
+    mutating func coder(from: Coder) -> PipelineComponent {
+        let componentCoders:[String] = switch from {
+        case let .keyvalue(keyCoder, valueCoder):
+            [coder(from:keyCoder).name,coder(from:valueCoder).name]
+        case let .iterable(valueCoder):
+            [coder(from:valueCoder).name]
+        case let .lengthprefix(valueCoder):
+            [coder(from:valueCoder).name]
+        case let .windowedvalue(valueCoder, windowCoder):
+            [coder(from:valueCoder).name,coder(from:windowCoder).name]
+        default:
+            []
+        }
+        return coder { _ in
+                .with {
+                    $0.spec = .with {
+                        $0.urn = from.urn
+                        if case .custom(let data) = from {
+                            $0.payload = data
+                        }
+                    }
+                    $0.componentCoderIds = componentCoders
+                }
+        }
+    }
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift
new file mode 100644
index 00000000000..5207748237f
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift
@@ -0,0 +1,38 @@
+extension PipelineProto {
+    
+    mutating func environment(from: Environment) throws -> PipelineComponent {
+        return try environment { _ in
+            try .with { p in
+                p.urn = switch from.category {
+                case .docker(_): .beamUrn("docker",type:"env")
+                case .system: .beamUrn("default",type:"env")
+                case .process(_, _, _, _): .beamUrn("process",type:"env")
+                case .external(_): .beamUrn("external",type:"env")
+                }
+                p.capabilities = from.capabilities
+                p.dependencies = from.dependencies.map({ $0.proto })
+                
+                if case let .docker(containerImage) = from.category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with {
+                        $0.containerImage = containerImage
+                    }.serializedData()
+                }
+                
+                if case let .external(endpoint) = from.category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with {
+                        $0.endpoint = endpoint.proto
+                    }.serializedData()
+                }
+                
+                if case let .process(command,arch,os,env) = from.category {
+                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with {
+                        $0.arch = arch
+                        $0.command = command
+                        $0.os = os
+                        $0.env = env
+                    }.serializedData()
+                }
+            }
+        }
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
index 52f2b28aef1..54c3ae4586e 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -24,6 +24,18 @@ public extension PCollection {
             }
         }
     }
+    
+    func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> 
PCollection<KV<K,V>> where Of == KV<K,V> {
+        pardo(name,prefix) { prefix,input,output in
+            for await element in input {
+                let kv = element.0
+                for v in kv.values {
+                    print("\(prefix): \(kv.key),\(v)")
+                }
+                output.emit(element)
+            }
+        }
+    }
 }
 
 /// Modifying Values
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 3e2d6a3be7d..625ba1eafbd 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -34,7 +34,7 @@ final class IntegrationTests: XCTestCase {
     }
 
     func testPortableWordcount() throws {
-        _ = Pipeline { pipeline in
+        try Pipeline { pipeline in
             let (contents,errors) = pipeline
                 .create(["file1.txt","file2.txt","missing.txt"])
                 .pardo { filenames,output,errors in
@@ -67,9 +67,12 @@ final class IntegrationTests: XCTestCase {
                  $0.value ?? 1)
             }.sum()
             
+            _ = normalizedCounts.log(prefix:"COUNT OUTPUT")
+            _ = errors.log(prefix:"ERROR OUTPUT")
             
-            
-        }
+        }.run()
+        
+        
     }
 
 

Reply via email to