This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9471e33631054f6d941d958e8b9b5abcbf4ff9d5 Author: Byron Ellis <[email protected]> AuthorDate: Tue Aug 22 14:12:44 2023 -0700 Added backwards links in PCollections to support Composite transforms. Added a very basic composite version of WordCount to the test suite. Still need to hook up the graph generation, which requires searching for the roots from the leaves to give it more of that "SwiftUI" style. --- sdks/swift/Package.resolved | 9 +++ sdks/swift/Package.swift | 4 +- .../Core/PCollection/AnyPCollection.swift | 11 +++- .../ApacheBeam/Core/PCollection/PCollection.swift | 17 ++++- .../Core/PTransform/OutputPTransform.swift | 10 --- .../ApacheBeam/Core/PTransform/Outputs.swift | 18 +++++ .../ApacheBeam/Core/PTransform/PTransform.swift | 1 + .../Core/PTransform/PTransformBuilder.swift | 37 ++++++++++- .../ApacheBeam/Core/Pipeline/Pipeline.swift | 19 ++++-- .../ApacheBeam/Core/Pipeline/PipelineContext.swift | 5 ++ .../Core/Pipeline/PipelineTransform.swift | 12 ++-- .../Sources/ApacheBeam/Transforms/Basic.swift | 7 +- .../Sources/ApacheBeam/Transforms/BuiltIn.swift | 74 +++++++++++++-------- .../Sources/ApacheBeam/Transforms/Grouping.swift | 2 +- .../Sources/ApacheBeam/Transforms/IO/DuckIO.swift | 13 ++++ .../ApacheBeam/Transforms/IO/ListFiles.swift | 12 ++++ .../Pipeline/CompositeIntegrationTests.swift | 76 ++++++++++++++++++++++ 17 files changed, 269 insertions(+), 58 deletions(-) diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved index 7bf7847cda6..1a12c374c0c 100644 --- a/sdks/swift/Package.resolved +++ b/sdks/swift/Package.resolved @@ -36,6 +36,15 @@ "version" : "1.3.2" } }, + { + "identity" : "duckdb-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/duckdb/duckdb-swift", + "state" : { + "revision" : "439899503fa15a06d931947e9d055baaf6a9dd89", + "version" : "0.8.1" + } + }, { "identity" : "google-auth-library-swift", "kind" : "remoteSourceControl", diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift index c5ca4db14eb..54cf8c5d88c 100644 --- a/sdks/swift/Package.swift +++ b/sdks/swift/Package.swift @@ -29,6 +29,7 @@ let dependencies: [Package.Dependency] = [ // Additional Transform Dependencies .package(url: "https://github.com/awslabs/aws-sdk-swift.git", from: "0.23.0"), .package(url: "https://github.com/googleapis/google-auth-library-swift",from:"0.0.0"), + .package(url: "https://github.com/duckdb/duckdb-swift", .upToNextMinor(from: .init(0, 8, 0))), // Swift Package Manager Plugins @@ -55,7 +56,8 @@ let package = Package( dependencies: [ .product(name:"GRPC",package:"grpc-swift"), .product(name: "Logging",package:"swift-log"), - .product(name: "AWSS3",package:"aws-sdk-swift") + .product(name: "AWSS3",package:"aws-sdk-swift"), + .product(name: "DuckDB", package: "duckdb-swift") ] ), .testTarget( diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift index 3e6af55f2a9..0b67e6068d8 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift @@ -24,7 +24,8 @@ public struct AnyPCollection : PCollectionProtocol { let ofType: Any.Type let collection: Any - let applyClosure: (Any,PipelineTransform) -> Void + let parentClosure: (Any) -> PipelineTransform? + let applyClosure: (Any,PipelineTransform) -> PipelineTransform let consumersClosure: (Any) -> [PipelineTransform] let coderClosure: (Any) -> Coder let streamClosure: (Any) -> AnyPCollectionStream @@ -41,6 +42,7 @@ public struct AnyPCollection : PCollectionProtocol { self.consumersClosure = { ($0 as! C).consumers } self.coderClosure = { ($0 as! C).coder } self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) } + self.parentClosure = { ($0 as! C).parent } } } @@ -49,11 +51,16 @@ public struct AnyPCollection : PCollectionProtocol { consumersClosure(collection) } - public func apply(_ transform: PipelineTransform) { + @discardableResult + public func apply(_ transform: PipelineTransform) -> PipelineTransform { applyClosure(collection,transform) } + public var parent: PipelineTransform? { + parentClosure(collection) + } + public var coder: Coder { coderClosure(collection) } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift index eec4c78c4eb..b8cf712be13 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift @@ -21,11 +21,13 @@ public protocol PCollectionProtocol { typealias Stream = PCollectionStream<Of> + var parent: PipelineTransform? { get } var consumers: [PipelineTransform] { get } var coder: Coder { get } var stream: Stream { get } - func apply(_ transform: PipelineTransform) + @discardableResult + func apply(_ transform: PipelineTransform) -> PipelineTransform } @@ -33,18 +35,27 @@ public final class PCollection<Of> : PCollectionProtocol { public let coder: Coder public var consumers: [PipelineTransform] + public private(set) var parent: PipelineTransform? - public init(coder: Coder = .of(type: Of.self)!,consumers:[PipelineTransform] = []) { + public init(coder: Coder = .of(type: Of.self)!,parent: PipelineTransform? = nil,consumers:[PipelineTransform] = []) { self.coder = coder self.consumers = consumers + self.parent = parent } public var stream: PCollectionStream<Of> { return PCollectionStream<Of>() } - public func apply(_ transform: PipelineTransform) { + @discardableResult + public func apply(_ transform: PipelineTransform) -> PipelineTransform { consumers.append(transform) + return transform } + func parent(_ transform: PipelineTransform) { + self.parent = transform + } + + } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift deleted file mode 100644 index c9cb0b5c228..00000000000 --- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/OutputPTransform.swift +++ /dev/null @@ -1,10 +0,0 @@ -public struct OutputPTransform<Of> : _PrimitivePTransform { - let name: String - let applyClosure: (Any) -> PCollection<Of> - - public init<In>(_ name:String,_ fn: @escaping (PCollection<In>) -> PCollection<Of>) { - self.name = name - self.applyClosure = { fn($0 as! PCollection<In>) } - } - -} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift new file mode 100644 index 00000000000..a0c246a438a --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift @@ -0,0 +1,18 @@ +public struct NamedCollectionPTransform<Of> : _PrimitivePTransform { + let name: String + let collection: PCollection<Of> +} + +/// Captures a single pcollection and gives it a name +public struct Output<Of> : PTransform { + let name: String + let fn: () -> PCollection<Of> + public init(_ name:String,_ fn: @escaping () -> PCollection<Of>) { + self.name = name + self.fn = fn + } + public var expand: NamedCollectionPTransform<Of> { + NamedCollectionPTransform(name: name, collection: fn()) + } +} + diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift index db7b87a5a73..f21ee3315a8 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransform.swift @@ -20,6 +20,7 @@ public protocol PTransform { associatedtype Expansion: PTransform + @PTransformBuilder var expand: Expansion { get } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift index a1e2185c849..6c4cae36b1f 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/PTransformBuilder.swift @@ -1,9 +1,28 @@ public struct EmptyPTransform : _PrimitivePTransform { - + public init() { } } +public struct _ConditionalPTransform<TruePTransform,FalsePTransform> : _PrimitivePTransform + where TruePTransform : PTransform, FalsePTransform: PTransform +{ + enum Storage { + case trueTransform(TruePTransform) + case falseTransform(FalsePTransform) + } + let storage: Storage +} - +extension _ConditionalPTransform { + public var children: [AnyPTransform] { + switch storage { + + case .trueTransform(let transform): + return [AnyPTransform(transform)] + case .falseTransform(let transform): + return [AnyPTransform(transform)] + } + } +} @resultBuilder public struct PTransformBuilder { @@ -16,5 +35,17 @@ public struct PTransformBuilder { transform } - + public static func buildEither<TrueT,FalseT>(first: TrueT) -> _ConditionalPTransform<TrueT,FalseT> where TrueT: PTransform, FalseT: PTransform { + .init(storage: .trueTransform(first)) + } + + public static func buildEither<TrueT,FalseT>(second: FalseT) -> _ConditionalPTransform<TrueT,FalseT> where TrueT: PTransform, FalseT: PTransform { + .init(storage: .falseTransform(second)) + } +} + +public extension PTransformBuilder { + static func buildBlock<T0,T1>(_ t0: T0,_ t1: T1) -> TuplePTransform<(T0,T1)> where T0: PTransform,T1: PTransform { + TuplePTransform<(T0,T1)>(t0,t1) + } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index 180710aae73..b9cab1376a5 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -28,6 +28,15 @@ public final class Pipeline { self.content = content } + public init<Transform:PTransform>(log: Logging.Logger = .init(label:"Pipeline"),@PTransformBuilder content: () -> Transform) { + self.log = log + + let transform = content() + self.content = { root in + //TODO: Attach transform to the root + } + } + public func run(_ runner: PipelineRunner) async throws { try await runner.run(try self.context) } @@ -162,7 +171,7 @@ public final class Pipeline { let inputs = parents.enumerated().map({ ("\($0)","\($1.name)")}).dict() switch pipelineTransform { - case let .pardo(n, fn, o): + case let .pardo(_, n, fn, o): let outputs = o.enumerated().map { ("\($0)",collection(from: $1).name) }.dict() @@ -187,7 +196,7 @@ public final class Pipeline { rootIds.append(p.name) //TODO: Composite transform handling fns[p.transform!.uniqueName] = fn toVisit.append(contentsOf: o.map { .collection($0) }) - case .impulse(let o): + case .impulse(_, let o): let outputs = [o].enumerated().map { ("\($0)",collection(from: $1).name) }.dict() @@ -207,7 +216,7 @@ public final class Pipeline { throw ApacheBeamError.runtimeError("flatten not implemented yet") case .external: throw ApacheBeamError.runtimeError("External Transforms not implemented yet") - case .groupByKey(let o): + case .groupByKey(_,let o): let outputs = [o].enumerated().map { ("\($0)",collection(from: $1).name) }.dict() @@ -224,7 +233,7 @@ public final class Pipeline { } rootIds.append(p.name) toVisit.append(.collection(o)) - case let .custom(urn,payload,env,o): + case let .custom(_,urn,payload,env,o): let outputs = o.enumerated().map { ("\($0)",collection(from: $1).name) }.dict() @@ -247,7 +256,7 @@ public final class Pipeline { } rootIds.append(p.name) toVisit.append(contentsOf:o.map { .collection($0) }) - case .composite(_): + case .composite(_,_): throw ApacheBeamError.runtimeError("Composite transforms are not yet implemented") } } else if case .collection(let anyPCollection) = item { diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift index 733397fff52..b764014612f 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift @@ -16,6 +16,11 @@ * limitations under the License. */ + +public protocol StatusProtocol { + +} + public final class PipelineContext { var proto : PipelineProto diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift index ce164566db1..1ce5c5f6bdc 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift @@ -21,13 +21,13 @@ import Foundation /// Enum for pipeline representable transforms as opposed to composite transforms /// which are a user-side construct represented by PTransform public enum PipelineTransform { - case pardo(String,SerializableFn,[AnyPCollection]) - case impulse(AnyPCollection) + case pardo(AnyPCollection,String,SerializableFn,[AnyPCollection]) + case impulse(AnyPCollection,AnyPCollection) case flatten([AnyPCollection],AnyPCollection) - case groupByKey(AnyPCollection) - case custom(String,Data,Environment?,[AnyPCollection]) - case composite(AnyPTransform) - case external + case groupByKey(AnyPCollection,AnyPCollection) + case custom(AnyPCollection,String,Data,Environment?,[AnyPCollection]) + case composite(AnyPCollection,AnyPTransform) + case external(AnyPCollection,[AnyPCollection]) } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index be7b3318c03..af9e89b18bd 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -70,7 +70,7 @@ public extension PCollection { } } - func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { + func map<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { return pardo(name:name ?? "\(_file):\(_line)") { input,output in for try await (i,ts,w) in input { let (key,value) = fn(i) @@ -98,3 +98,8 @@ public extension PCollection<Never> { return impulse().create(values,name:name,_file:_file,_line:_line) } } + +public func create<Value:Codable>(_ values: [Value],name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { + let root = PCollection<Never>() + return root.create(values,name:name,_file:_file,_line:_line) +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift index 7bea2c4f0c8..d33a576c42c 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -23,7 +23,7 @@ public extension PCollection where Of == Never { /// which is the root transform used by Pipelines. func impulse() -> PCollection<Data> { let output = PCollection<Data>() - self.apply(.impulse(AnyPCollection(output))) + self.apply(.impulse(AnyPCollection(self),AnyPCollection(output))) return output } } @@ -34,56 +34,56 @@ public extension PCollection { // No Output func pardo<F:SerializableFn>(name:String,_ fn: F) { - self.apply(.pardo(name, fn, [])) + self.apply(.pardo(AnyPCollection(self),name, fn, [])) } func pardo(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name, ClosureFn(fn),[])) + self.apply(.pardo(AnyPCollection(self),name, ClosureFn(fn),[])) } func pardo<Param:Codable>(name:String,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name, ParameterizedClosureFn(param,fn), [])) + self.apply(.pardo(AnyPCollection(self),name, ParameterizedClosureFn(param,fn), [])) } // No Output Generated Names func pardo<F:SerializableFn>(_file:String=#fileID,_line:Int=#line,_ fn: F) { - self.apply(.pardo("\(_file):\(_line)", fn, [])) + self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [])) } func pardo(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo("\(_file):\(_line)", ClosureFn(fn),[])) + self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", ClosureFn(fn),[])) } func pardo<Param:Codable>(_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo("\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) + self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) } // Single Output func pardo<F:SerializableFn,O0>(name:String,fn: F, _ o0:PCollection<O0>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0)])) + self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0)])) } func pardo<O0>(name:String,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)])) + self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output)])) return output } func pardo<Param:Codable,O0>(name:String,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) return output } // Single Output Generated Names func pardo<F:SerializableFn,O0>(_file:String=#fileID,_line:Int=#line,fn: F, _ o0:PCollection<O0>) { - self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0)])) + self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0)])) } func pardo<O0>(_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])) + output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)]))) return output } func pardo<Param:Codable,O0>(_file:String=#fileID,_line:Int=#line,_ param: Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + output.parent(self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)]))) return output } @@ -92,36 +92,46 @@ public extension PCollection { // Two Outputs func pardo<F:SerializableFn,O0,O1>(name:String,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) + self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) } func pardo<O0,O1>(name:String, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + let parent = self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + output.0.parent(parent) + output.1.parent(parent) return output } func pardo<Param:Codable,O0,O1>(name:String,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + let parent = self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + output.0.parent(parent) + output.1.parent(parent) return output } // Two Outputs Generated Names func pardo<F:SerializableFn,O0,O1>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { - self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) + let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) + o0.parent(parent) + o1.parent(parent) } func pardo<O0,O1>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + output.0.parent(parent) + output.1.parent(parent) return output } func pardo<Param:Codable,O0,O1>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + output.0.parent(parent) + output.1.parent(parent) return output } @@ -130,36 +140,48 @@ public extension PCollection { // Three Outputs func pardo<F:SerializableFn,O0,O1,O2>(name:String,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + self.apply(.pardo(AnyPCollection(self),name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } func pardo<O0,O1,O2>(name:String, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + let parent = self.apply(.pardo(AnyPCollection(self),name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + output.0.parent(parent) + output.1.parent(parent) + output.2.parent(parent) return output } func pardo<Param:Codable,O0,O1,O2>(name:String,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + let parent = self.apply(.pardo(AnyPCollection(self),name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + output.0.parent(parent) + output.1.parent(parent) + output.2.parent(parent) return output } // Three Outputs Generated Names func pardo<F:SerializableFn,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { - self.apply(.pardo("\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } func pardo<O0,O1,O2>(_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo("\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + output.0.parent(parent) + output.1.parent(parent) + output.2.parent(parent) return output } func pardo<Param:Codable,O0,O1,O2>(_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo("\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + let parent = self.apply(.pardo(AnyPCollection(self),"\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + output.0.parent(parent) + output.1.parent(parent) + output.2.parent(parent) return output } @@ -172,7 +194,7 @@ public extension PCollection { func groupByKey<K,V>() -> PCollection<KV<K,V>> where Of == KV<K,V> { // Adjust the coder for the pcollection to reflect GBK semantcs let output = PCollection<KV<K,V>>(coder:.keyvalue(.of(type: K.self)!, .of(type: Array<V>.self)!)) - self.apply(.groupByKey(AnyPCollection(output))) + self.apply(.groupByKey(AnyPCollection(self),AnyPCollection(output))) return output } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift index 8cf2b46cfea..2b1b52402ea 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift @@ -20,7 +20,7 @@ /// public extension PCollection { func groupBy<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { - return map(name,_file:_file,_line:_line,fn) + return map(name:name,_file:_file,_line:_line,fn) .groupByKey() } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift new file mode 100644 index 00000000000..028e838527f --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/DuckIO.swift @@ -0,0 +1,13 @@ +import DuckDB + +public struct DuckDB { + public enum ExternalTable : Codable { + case csv(String,String) + case json(String,String) + case parquet(String,String) + } + + var tables: [ExternalTable] + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift index 2c6b79b6ee1..2ebb7f852d3 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift @@ -26,6 +26,18 @@ struct GSBucket : Codable { public extension PCollection { + func listFiles(s3 bucket:String) { + + } + func listFiles(gs bucket:String) { + } + +} + +public func listFiles(s3 bucket: String) { +} + +public func listFiles(gs bucket: String) { } diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift new file mode 100644 index 00000000000..f2ee142a966 --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift @@ -0,0 +1,76 @@ +// +// CompositeIntegrationTests.swift +// +// +// Created by Byron Ellis on 8/22/23. +// + +import ApacheBeam +import XCTest + +/// Simple composite. Interesting the type resolution in composites doesn't work as well as other things? Not sure why that is. Not a huge deal. +public struct FixtureWordCount : PTransform { + + let fixtures: [String] + public init(fixtures:[String]) { + self.fixtures = fixtures + } + + public var expand: some PTransform { + + let (contents,errors) = create(self.fixtures) + .pardo(name:"Read Files") { (filenames,output:PCollectionStream<String>,errors:PCollectionStream<String>) in + for await (filename,_,_) in filenames { + do { + output.emit(String(decoding:try fixtureData(filename),as:UTF8.self)) + } catch { + errors.emit("Unable to read \(filename): \(error)") + } + } + } + + let baseCount = contents.pardo { (contents,lines:PCollectionStream<String>) in + for await (content,_,_) in contents { + content.enumerateLines { line,_ in + lines.emit(line) + } + } + } + .flatMap({ $0.components(separatedBy: .whitespaces) }) + .groupBy({ ($0,1) }) + .sum() + + Output("counts") { + baseCount.groupBy { + ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters), + $0.value ?? 1) + }.sum() + } + + Output("errors") { + errors + } + + + } +} + + +/// Test cases for composite test +final class CompositeIntegrationTests: XCTestCase { + + override func setUpWithError() throws { + } + + override func tearDownWithError() throws { + } + + func testCompositeWordCount() async throws { + try await Pipeline { + FixtureWordCount(fixtures: ["file1.txt","file2.txt","missing.txt"]) + }.run(PortableRunner(loopback:true)) + + } + + +}
