This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 9471e33631054f6d941d958e8b9b5abcbf4ff9d5
Author: Byron Ellis <[email protected]>
AuthorDate: Tue Aug 22 14:12:44 2023 -0700

    Added backwards links in PCollections to support Composite transforms. 
Added a very basic composite version of WordCount to the test suite. Still need 
to hook up the graph generation, which requires searching for the roots from 
the leaves to give it more of that "SwiftUI" style.
---
 sdks/swift/Package.resolved                        |  9 +++
 sdks/swift/Package.swift                           |  4 +-
 .../Core/PCollection/AnyPCollection.swift          | 11 +++-
 .../ApacheBeam/Core/PCollection/PCollection.swift  | 17 ++++-
 .../Core/PTransform/OutputPTransform.swift         | 10 ---
 .../ApacheBeam/Core/PTransform/Outputs.swift       | 18 +++++
 .../ApacheBeam/Core/PTransform/PTransform.swift    |  1 +
 .../Core/PTransform/PTransformBuilder.swift        | 37 ++++++++++-
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        | 19 ++++--
 .../ApacheBeam/Core/Pipeline/PipelineContext.swift |  5 ++
 .../Core/Pipeline/PipelineTransform.swift          | 12 ++--
 .../Sources/ApacheBeam/Transforms/Basic.swift      |  7 +-
 .../Sources/ApacheBeam/Transforms/BuiltIn.swift    | 74 +++++++++++++--------
 .../Sources/ApacheBeam/Transforms/Grouping.swift   |  2 +-
 .../Sources/ApacheBeam/Transforms/IO/DuckIO.swift  | 13 ++++
 .../ApacheBeam/Transforms/IO/ListFiles.swift       | 12 ++++
 .../Pipeline/CompositeIntegrationTests.swift       | 76 ++++++++++++++++++++++
 17 files changed, 269 insertions(+), 58 deletions(-)

diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved
index 7bf7847cda6..1a12c374c0c 100644
--- a/sdks/swift/Package.resolved
+++ b/sdks/swift/Package.resolved
@@ -36,6 +36,15 @@
         "version" : "1.3.2"
       }
     },
+    {
+      "identity" : "duckdb-swift",
+      "kind" : "remoteSourceControl",
+      "location" : "https://github.com/duckdb/duckdb-swift";,
+      "state" : {
+        "revision" : "439899503fa15a06d931947e9d055baaf6a9dd89",
+        "version" : "0.8.1"
+      }
+    },
     {
       "identity" : "google-auth-library-swift",
       "kind" : "remoteSourceControl",
diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift
index c5ca4db14eb..54cf8c5d88c 100644
--- a/sdks/swift/Package.swift
+++ b/sdks/swift/Package.swift
@@ -29,6 +29,7 @@ let dependencies: [Package.Dependency] = [
     // Additional Transform Dependencies
     .package(url: "https://github.com/awslabs/aws-sdk-swift.git";, from: 
"0.23.0"),
     .package(url: 
"https://github.com/googleapis/google-auth-library-swift",from:"0.0.0";),
+    .package(url: "https://github.com/duckdb/duckdb-swift";, 
.upToNextMinor(from: .init(0, 8, 0))),
     
     
     // Swift Package Manager Plugins
@@ -55,7 +56,8 @@ let package = Package(
             dependencies: [
                 .product(name:"GRPC",package:"grpc-swift"),
                 .product(name: "Logging",package:"swift-log"),
-                .product(name: "AWSS3",package:"aws-sdk-swift")
+                .product(name: "AWSS3",package:"aws-sdk-swift"),
+                .product(name: "DuckDB", package: "duckdb-swift")
             ]
         ),
         .testTarget(
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 3e6af55f2a9..0b67e6068d8 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -24,7 +24,8 @@ public struct AnyPCollection : PCollectionProtocol {
     let ofType: Any.Type
     let collection: Any
     
-    let applyClosure: (Any,PipelineTransform) -> Void
+    let parentClosure: (Any) -> PipelineTransform?
+    let applyClosure: (Any,PipelineTransform) -> PipelineTransform
     let consumersClosure: (Any) -> [PipelineTransform]
     let coderClosure: (Any) -> Coder
     let streamClosure: (Any) -> AnyPCollectionStream
@@ -41,6 +42,7 @@ public struct AnyPCollection : PCollectionProtocol {
             self.consumersClosure = { ($0 as! C).consumers }
             self.coderClosure = { ($0 as! C).coder }
             self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
+            self.parentClosure = { ($0 as! C).parent }
         }
     }
     
@@ -49,11 +51,16 @@ public struct AnyPCollection : PCollectionProtocol {
         consumersClosure(collection)
     }
     
-    public func apply(_ transform: PipelineTransform) {
+    @discardableResult
+    public func apply(_ transform: PipelineTransform) -> PipelineTransform {
         applyClosure(collection,transform)
     }
 
     
+    public var parent: PipelineTransform? {
+        parentClosure(collection)
+    }
+    
     public var coder: Coder {
         coderClosure(collection)
     }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
index eec4c78c4eb..b8cf712be13 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -21,11 +21,13 @@ public protocol PCollectionProtocol {
     
     typealias Stream = PCollectionStream<Of>
     
+    var parent: PipelineTransform? { get }
     var consumers: [PipelineTransform] { get }
     var coder: Coder { get }
     var stream: Stream { get }
     
-    func apply(_ transform: PipelineTransform)
+    @discardableResult
+    func apply(_ transform: PipelineTransform) -> PipelineTransform
 }
 
 
@@ -33,18 +35,27 @@ public final class PCollection<Of> : PCollectionProtocol {
 
     public let coder: Coder
     public var consumers: [PipelineTransform]
+    public private(set) var parent: PipelineTransform?
     
-    public init(coder: Coder = .of(type: 
Of.self)!,consumers:[PipelineTransform] = []) {
+    public init(coder: Coder = .of(type: Of.self)!,parent: PipelineTransform? 
= nil,consumers:[PipelineTransform] = []) {
         self.coder = coder
         self.consumers = consumers
+        self.parent = parent
     }
 
     public var stream: PCollectionStream<Of> {
         return PCollectionStream<Of>()
     }
     
-    public func apply(_ transform: PipelineTransform) {
+    @discardableResult
+    public func apply(_ transform: PipelineTransform) -> PipelineTransform {
         consumers.append(transform)
+        return transform
     }
     
+    func parent(_ transform: PipelineTransform) {
+        self.parent = transform
+    }
+    
+    
 }
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift
deleted file mode 100644
index c9cb0b5c228..00000000000
--- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift
+++ /dev/null
@@ -1,10 +0,0 @@
-public struct OutputPTransform<Of> : _PrimitivePTransform {
-    let name: String
-    let applyClosure: (Any) -> PCollection<Of>
-
-    public init<In>(_ name:String,_ fn: @escaping (PCollection<In>) -> 
PCollection<Of>) {
-        self.name = name
-        self.applyClosure = { fn($0 as! PCollection<In>) }
-    }
-    
-}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift
new file mode 100644
index 00000000000..a0c246a438a
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift
@@ -0,0 +1,18 @@
+public struct NamedCollectionPTransform<Of> : _PrimitivePTransform {
+    let name: String
+    let collection: PCollection<Of>
+}
+
+/// Captures a single pcollection and gives it a name
+public struct Output<Of> : PTransform {
+    let name: String
+    let fn: () -> PCollection<Of>
+    public init(_ name:String,_ fn: @escaping () -> PCollection<Of>) {
+        self.name = name
+        self.fn   = fn
+    }
+    public var expand: NamedCollectionPTransform<Of> {
+        NamedCollectionPTransform(name: name, collection: fn())
+    }
+}
+
diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
index db7b87a5a73..f21ee3315a8 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift
@@ -20,6 +20,7 @@
 public protocol PTransform {
     associatedtype Expansion: PTransform
     
+    @PTransformBuilder
     var expand: Expansion { get }
 }
 
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift
index a1e2185c849..6c4cae36b1f 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift
@@ -1,9 +1,28 @@
 public struct EmptyPTransform : _PrimitivePTransform {
-    
+    public init() { }
 }
 
+public struct _ConditionalPTransform<TruePTransform,FalsePTransform> : 
_PrimitivePTransform
+    where TruePTransform : PTransform, FalsePTransform: PTransform 
+{
+    enum Storage {
+        case trueTransform(TruePTransform)
+        case falseTransform(FalsePTransform)
+    }
+    let storage: Storage
+}
 
-
+extension _ConditionalPTransform {
+    public var children: [AnyPTransform] {
+        switch storage {
+            
+        case .trueTransform(let transform):
+            return [AnyPTransform(transform)]
+        case .falseTransform(let transform):
+            return [AnyPTransform(transform)]
+        }
+    }
+}
 
 @resultBuilder
 public struct PTransformBuilder {
@@ -16,5 +35,17 @@ public struct PTransformBuilder {
         transform
     }
     
-    
+    public static func buildEither<TrueT,FalseT>(first: TrueT) -> 
_ConditionalPTransform<TrueT,FalseT> where TrueT: PTransform, FalseT: 
PTransform {
+        .init(storage: .trueTransform(first))
+    }
+
+    public static func buildEither<TrueT,FalseT>(second: FalseT) -> 
_ConditionalPTransform<TrueT,FalseT> where TrueT: PTransform, FalseT: 
PTransform {
+        .init(storage: .falseTransform(second))
+    }
+}
+
+public extension PTransformBuilder {
+    static func buildBlock<T0,T1>(_ t0: T0,_ t1: T1) -> 
TuplePTransform<(T0,T1)> where T0: PTransform,T1: PTransform {
+        TuplePTransform<(T0,T1)>(t0,t1)
+    }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index 180710aae73..b9cab1376a5 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -28,6 +28,15 @@ public final class Pipeline {
         self.content = content
     }
     
+    public init<Transform:PTransform>(log: Logging.Logger = 
.init(label:"Pipeline"),@PTransformBuilder content: () -> Transform) {
+        self.log = log
+        
+        let transform = content()
+        self.content = { root in
+            //TODO: Attach transform to the root
+        }
+    }
+    
     public func run(_ runner: PipelineRunner) async throws {
         try await runner.run(try self.context)        
     }
@@ -162,7 +171,7 @@ public final class Pipeline {
                         let inputs = parents.enumerated().map({ 
("\($0)","\($1.name)")}).dict()
                         switch pipelineTransform {
                             
-                        case let .pardo(n, fn, o):
+                        case let .pardo(_, n, fn, o):
                             let outputs = o.enumerated().map {
                                 ("\($0)",collection(from: $1).name)
                             }.dict()
@@ -187,7 +196,7 @@ public final class Pipeline {
                             rootIds.append(p.name) //TODO: Composite transform 
handling
                             fns[p.transform!.uniqueName] = fn
                             toVisit.append(contentsOf: o.map { .collection($0) 
})
-                        case .impulse(let o):
+                        case .impulse(_, let o):
                             let outputs = [o].enumerated().map {
                                 ("\($0)",collection(from: $1).name)
                             }.dict()
@@ -207,7 +216,7 @@ public final class Pipeline {
                             throw ApacheBeamError.runtimeError("flatten not 
implemented yet")
                         case .external:
                             throw ApacheBeamError.runtimeError("External 
Transforms not implemented yet")
-                        case .groupByKey(let o):
+                        case .groupByKey(_,let o):
                             let outputs = [o].enumerated().map {
                                 ("\($0)",collection(from: $1).name)
                             }.dict()
@@ -224,7 +233,7 @@ public final class Pipeline {
                             }
                             rootIds.append(p.name)
                             toVisit.append(.collection(o))
-                        case let .custom(urn,payload,env,o):
+                        case let .custom(_,urn,payload,env,o):
                             let outputs = o.enumerated().map {
                                 ("\($0)",collection(from: $1).name)
                             }.dict()
@@ -247,7 +256,7 @@ public final class Pipeline {
                             }
                             rootIds.append(p.name)
                             toVisit.append(contentsOf:o.map { .collection($0) 
})
-                        case .composite(_):
+                        case .composite(_,_):
                             throw ApacheBeamError.runtimeError("Composite 
transforms are not yet implemented")
                         }
                     } else if case .collection(let anyPCollection) = item {
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
index 733397fff52..b764014612f 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
@@ -16,6 +16,11 @@
  * limitations under the License.
  */
 
+
+public protocol StatusProtocol {
+    
+}
+
 public final class PipelineContext {
 
     var proto : PipelineProto
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
index ce164566db1..1ce5c5f6bdc 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift
@@ -21,13 +21,13 @@ import Foundation
 /// Enum for pipeline representable transforms as opposed to composite 
transforms
 /// which are a user-side construct represented by PTransform
 public enum PipelineTransform {
-    case pardo(String,SerializableFn,[AnyPCollection])
-    case impulse(AnyPCollection)
+    case pardo(AnyPCollection,String,SerializableFn,[AnyPCollection])
+    case impulse(AnyPCollection,AnyPCollection)
     case flatten([AnyPCollection],AnyPCollection)
-    case groupByKey(AnyPCollection)
-    case custom(String,Data,Environment?,[AnyPCollection])
-    case composite(AnyPTransform)
-    case external
+    case groupByKey(AnyPCollection,AnyPCollection)
+    case custom(AnyPCollection,String,Data,Environment?,[AnyPCollection])
+    case composite(AnyPCollection,AnyPTransform)
+    case external(AnyPCollection,[AnyPCollection])
 }
 
 
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
index be7b3318c03..af9e89b18bd 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -70,7 +70,7 @@ public extension PCollection {
         }
     }
     
-    func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
+    func map<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ 
fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> {
         return pardo(name:name ?? "\(_file):\(_line)") { input,output in
             for try await (i,ts,w) in input {
                 let (key,value) = fn(i)
@@ -98,3 +98,8 @@ public extension PCollection<Never> {
         return impulse().create(values,name:name,_file:_file,_line:_line)
     }
 }
+
+public func create<Value:Codable>(_ values: [Value],name:String? = 
nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> {
+    let root = PCollection<Never>()
+    return root.create(values,name:name,_file:_file,_line:_line)
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
index 7bea2c4f0c8..d33a576c42c 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift
@@ -23,7 +23,7 @@ public extension PCollection where Of == Never {
     /// which is the root transform used by Pipelines.
     func impulse() -> PCollection<Data> {
         let output = PCollection<Data>()
-        self.apply(.impulse(AnyPCollection(output)))
+        self.apply(.impulse(AnyPCollection(self),AnyPCollection(output)))
         return output
     }
 }
@@ -34,56 +34,56 @@ public extension PCollection {
     
     // No Output
     func pardo<F:SerializableFn>(name:String,_ fn: F) {
-        self.apply(.pardo(name, fn, []))
+        self.apply(.pardo(AnyPCollection(self),name, fn, []))
     }
     func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) 
async throws -> Void) {
-        self.apply(.pardo(name, ClosureFn(fn),[]))
+        self.apply(.pardo(AnyPCollection(self),name, ClosureFn(fn),[]))
     }
     func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream) async throws -> Void) {
-        self.apply(.pardo(name, ParameterizedClosureFn(param,fn), []))
+        self.apply(.pardo(AnyPCollection(self),name, 
ParameterizedClosureFn(param,fn), []))
     }
     
     // No Output Generated Names
     func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) 
{
-        self.apply(.pardo("\(_file):\(_line)", fn, []))
+        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, []))
     }
     func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream) async throws -> Void) {
-        self.apply(.pardo("\(_file):\(_line)", ClosureFn(fn),[]))
+        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", 
ClosureFn(fn),[]))
     }
     func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ 
param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async 
throws -> Void) {
-        self.apply(.pardo("\(_file):\(_line)", 
ParameterizedClosureFn(param,fn), []))
+        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", 
ParameterizedClosureFn(param,fn), []))
     }
 
 
     // Single Output
     func pardo<F:SerializableFn,O0>(name:String,fn: F,
                                     _ o0:PCollection<O0>) {
-        self.apply(.pardo(name, fn, [AnyPCollection(o0)]))
+        self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0)]))
     }
     func pardo<O0>(name:String,_ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
         let output = PCollection<O0>()
-        self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)]))
+        
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output)]))
         return output
     }
     func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable 
@escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> 
Void) -> (PCollection<O0>) {
         let output = PCollection<O0>()
-        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
         return output
     }
     
     // Single Output Generated Names
     func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F,
                                     _ o0:PCollection<O0>) {
-        self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0)]))
+        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0)]))
     }
     func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable 
@escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) 
-> (PCollection<O0>) {
         let output = PCollection<O0>()
-        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))
+        
output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])))
         return output
     }
     func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: 
Param,_ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> 
(PCollection<O0>) {
         let output = PCollection<O0>()
-        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))
+        
output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])))
         return output
     }
 
@@ -92,36 +92,46 @@ public extension PCollection {
     // Two Outputs
     func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F,
                                     _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
-        self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)]))
+        self.apply(.pardo(AnyPCollection(self),name, fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
     }
     func pardo<O0,O1>(name:String,
                       _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
         return output
     }
     func pardo<Param:Codable,O0,O1>(name:String,_ param: Param,
                                     _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
         return output
     }
     
     // Two Outputs Generated Names
     func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
fn: F,
                                     _ o0:PCollection<O0>,_ o1:PCollection<O1>) 
{
-        self.apply(.pardo("\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1)]))
+        o0.parent(parent)
+        o1.parent(parent)
     }
     func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line,
                       _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async 
throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
         return output
     }
     func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
                                     _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) 
async throws -> Void) -> (PCollection<O0>,PCollection<O1>) {
         let output = (PCollection<O0>(),PCollection<O1>())
-        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
         return output
     }
 
@@ -130,36 +140,48 @@ public extension PCollection {
     // Three Outputs
     func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F,
                                           _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
-        self.apply(.pardo(name, fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+        self.apply(.pardo(AnyPCollection(self),name, fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
     }
     func pardo<O0,O1,O2>(name:String,
                          _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        
self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
+        output.2.parent(parent)
         return output
     }
     func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param,
                                        _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        
self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
+        output.2.parent(parent)
         return output
     }
     
     // Three Outputs Generated Names
     func 
pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F,
                                           _ o0:PCollection<O0>,_ 
o1:PCollection<O1>,_ o2:PCollection<O2>) {
-        self.apply(.pardo("\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
+        self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, 
[AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)]))
     }
     func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line,
                          _ fn: @Sendable @escaping 
(PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        
self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
+        output.2.parent(parent)
         return output
     }
     func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ 
param: Param,
                                        _ fn: @Sendable @escaping 
(Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream)
 async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) {
         let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>())
-        
self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        let parent = 
self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)]))
+        output.0.parent(parent)
+        output.1.parent(parent)
+        output.2.parent(parent)
         return output
     }
 
@@ -172,7 +194,7 @@ public extension PCollection {
     func groupByKey<K,V>() -> PCollection<KV<K,V>> where Of == KV<K,V> {
         // Adjust the coder for the pcollection to reflect GBK semantcs
         let output = PCollection<KV<K,V>>(coder:.keyvalue(.of(type: K.self)!, 
.of(type: Array<V>.self)!))
-        self.apply(.groupByKey(AnyPCollection(output)))
+        self.apply(.groupByKey(AnyPCollection(self),AnyPCollection(output)))
         return output
     }
     
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
index 8cf2b46cfea..2b1b52402ea 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift
@@ -20,7 +20,7 @@
 ///
 public extension PCollection {
     func groupBy<K,V>(name:String? = 
nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> 
(K,V)) -> PCollection<KV<K,V>> {
-        return map(name,_file:_file,_line:_line,fn)
+        return map(name:name,_file:_file,_line:_line,fn)
             .groupByKey()
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift
new file mode 100644
index 00000000000..028e838527f
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift
@@ -0,0 +1,13 @@
+import DuckDB
+
+public struct DuckDB {
+    public enum ExternalTable : Codable {
+        case csv(String,String)
+        case json(String,String)
+        case parquet(String,String)
+    }
+    
+    var tables: [ExternalTable]
+    
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
index 2c6b79b6ee1..2ebb7f852d3 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift
@@ -26,6 +26,18 @@ struct GSBucket : Codable {
 
 public extension PCollection {
     
+    func listFiles(s3 bucket:String) {
+        
+    }
     
+    func listFiles(gs bucket:String) {
+    }
+    
+}
+
+public func listFiles(s3 bucket: String) {
+}
+
+public func listFiles(gs bucket: String) {
     
 }
diff --git 
a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
new file mode 100644
index 00000000000..f2ee142a966
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift
@@ -0,0 +1,76 @@
+//
+//  CompositeIntegrationTests.swift
+//  
+//
+//  Created by Byron Ellis on 8/22/23.
+//
+
+import ApacheBeam
+import XCTest
+
+/// Simple composite. Interesting the type resolution in composites doesn't 
work as well as other things? Not sure why that is. Not a huge deal.
+public struct FixtureWordCount : PTransform {
+    
+    let fixtures: [String]
+    public init(fixtures:[String]) {
+        self.fixtures = fixtures
+    }
+    
+    public var expand: some PTransform {
+        
+        let (contents,errors) = create(self.fixtures)
+            .pardo(name:"Read Files") { 
(filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in
+            for await (filename,_,_) in filenames {
+                do {
+                    output.emit(String(decoding:try 
fixtureData(filename),as:UTF8.self))
+                } catch {
+                    errors.emit("Unable to read \(filename): \(error)")
+                }
+            }
+        }
+        
+        let baseCount = contents.pardo { 
(contents,lines:PCollectionStream<String>) in
+            for await (content,_,_) in contents {
+                content.enumerateLines { line,_ in
+                    lines.emit(line)
+                }
+            }
+        }
+        .flatMap({ $0.components(separatedBy: .whitespaces) })
+        .groupBy({ ($0,1) })
+        .sum()
+        
+        Output("counts") {
+            baseCount.groupBy {
+                ($0.key.lowercased().trimmingCharacters(in: 
.punctuationCharacters),
+                 $0.value ?? 1)
+            }.sum()
+        }
+        
+        Output("errors") {
+            errors
+        }
+
+
+    }
+}
+
+
+/// Test cases for composite test
+final class CompositeIntegrationTests: XCTestCase {
+
+    override func setUpWithError() throws {
+    }
+
+    override func tearDownWithError() throws {
+    }
+
+    func testCompositeWordCount() async throws {
+        try await Pipeline {
+            FixtureWordCount(fixtures: ["file1.txt","file2.txt","missing.txt"])
+        }.run(PortableRunner(loopback:true))
+
+    }
+
+
+}

Reply via email to