This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 16e1ca311d1719bae8665ba7d7710efde9fc7b5b Author: Byron Ellis <[email protected]> AuthorDate: Thu Aug 17 15:53:53 2023 -0700 Second attempt and fixing the line values. --- sdks/swift/README.md | 6 +++ .../Sources/ApacheBeam/Transforms/Basic.swift | 29 ++++++------- .../Sources/ApacheBeam/Transforms/BuiltIn.swift | 48 +++++++++++----------- .../Sources/ApacheBeam/Transforms/Combining.swift | 8 ++-- .../Sources/ApacheBeam/Transforms/Grouping.swift | 4 +- 5 files changed, 51 insertions(+), 44 deletions(-) diff --git a/sdks/swift/README.md b/sdks/swift/README.md index 810ccae8b24..d48def0f481 100644 --- a/sdks/swift/README.md +++ b/sdks/swift/README.md @@ -16,4 +16,10 @@ let package = Package( ) ``` +## Writing a Pipeline + +``` +import ApacheBeam + + diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index 0b97a084ccf..9be08330b46 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -20,8 +20,9 @@ public extension PCollection { /// Each time the input fires output all of the values in this list. - func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> { - return pardo(name,values) { values,input,output in + func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { + + return pardo(name,_file:_file,_line:_line,values) { values,input,output in for try await (_,ts,w) in input { for v in values { output.emit(v,timestamp:ts,window:w) @@ -35,8 +36,8 @@ public extension PCollection { public extension PCollection { @discardableResult - func log(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<Of> where Of == String { - pardo(name,prefix) { prefix,input,output in + func log(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Of> where Of == String { + pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in for await element in input { print("\(prefix): \(element)") output.emit(element) @@ -45,8 +46,8 @@ public extension PCollection { } @discardableResult - func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<KV<K,V>> where Of == KV<K,V> { - pardo(name,prefix) { prefix,input,output in + func log<K,V>(prefix:String,_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> { + pardo(name,_file:_file,_line:_line,prefix) { prefix,input,output in for await element in input { let kv = element.0 for v in kv.values { @@ -62,16 +63,16 @@ public extension PCollection { public extension PCollection { /// Modify a value without changing its window or timestamp - func map<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { - return pardo(name) { input,output in + func map<Out>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> Out) -> PCollection<Out> { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (v,ts,w) in input { output.emit(fn(v),timestamp:ts,window:w) } } } - func map<K,V>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { - return pardo(name) { input,output in + func map<K,V>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (i,ts,w) in input { let (key,value) = fn(i) output.emit(KV(key,value),timestamp:ts,window:w) @@ -80,8 +81,8 @@ public extension PCollection { } /// Produce multiple outputs as a single value without modifying window or timestamp - func flatMap<Out>(name:String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { - return pardo(name) { input,output in + func flatMap<Out>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> [Out]) -> PCollection<Out> { + return pardo(name,_file:_file,_line:_line) { input,output in for try await (v,ts,w) in input { for i in fn(v) { output.emit(i,timestamp:ts,window:w) @@ -94,7 +95,7 @@ public extension PCollection { public extension PCollection<Never> { /// Convenience function to add an impulse when we are at the root of the pipeline - func create<Value:Codable>(_ values: [Value],_ name:String = "\(#file):\(#line)") -> PCollection<Value> { - return impulse().create(values,name) + func create<Value:Codable>(_ values: [Value],_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<Value> { + return impulse().create(values,name,_file:_file,_line:_line) } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift index 3684b239050..4cc3532da21 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn.swift @@ -33,68 +33,68 @@ public extension PCollection { // TODO: Replace with parameter pack version once https://github.com/apple/swift/issues/67192 is resolved // No Output - func pardo<F:SerializableFn>(_ name: String = "\(#file):\(#line)",_ fn: F) { - self.apply(.pardo(name, fn, [])) + func pardo<F:SerializableFn>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F) { + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [])) } - func pardo(_ name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name, ClosureFn(fn),[])) + func pardo(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name ?? "\(_file):\(_line)", ClosureFn(fn),[])) } - func pardo<Param:Codable>(_ name: String = "\(#file):\(#line)",_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { - self.apply(.pardo(name, ParameterizedClosureFn(param,fn), [])) + func pardo<Param:Codable>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param:Param,_ fn: @Sendable @escaping (Param,PCollection<Of>.Stream) async throws -> Void) { + self.apply(.pardo(name ?? "\(_file):\(_line)", ParameterizedClosureFn(param,fn), [])) } // Single Output - func pardo<F:SerializableFn,O0>(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo<F:SerializableFn,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0)])) } - func pardo<O0>(_ name: String = "\(#file):\(#line)", + func pardo<O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output)])) return output } - func pardo<Param:Codable,O0>(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo<Param:Codable,O0>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream) async throws -> Void) -> (PCollection<O0>) { let output = PCollection<O0>() - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output)])) return output } // Two Outputs - func pardo<F:SerializableFn,O0,O1>(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo<F:SerializableFn,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1)])) } - func pardo<O0,O1>(_ name: String = "\(#file):\(#line)", + func pardo<O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } - func pardo<Param:Codable,O0,O1>(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo<Param:Codable,O0,O1>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { let output = (PCollection<O0>(),PCollection<O1>()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1)])) return output } // Three Outputs - func pardo<F:SerializableFn,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ fn: F, + func pardo<F:SerializableFn,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: F, _ o0:PCollection<O0>,_ o1:PCollection<O1>,_ o2:PCollection<O2>) { - self.apply(.pardo(name, fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)", fn, [AnyPCollection(o0),AnyPCollection(o1),AnyPCollection(o2)])) } - func pardo<O0,O1,O2>(_ name: String = "\(#file):\(#line)", + func pardo<O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line, _ fn: @Sendable @escaping (PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name,ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ClosureFn(fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } - func pardo<Param:Codable,O0,O1,O2>(_ name: String = "\(#file):\(#line)",_ param: Param, + func pardo<Param:Codable,O0,O1,O2>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line,_ param: Param, _ fn: @Sendable @escaping (Param,PCollection<Of>.Stream,PCollection<O0>.Stream,PCollection<O1>.Stream,PCollection<O2>.Stream) async throws -> Void) -> (PCollection<O0>,PCollection<O1>,PCollection<O2>) { let output = (PCollection<O0>(),PCollection<O1>(),PCollection<O2>()) - self.apply(.pardo(name,ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) + self.apply(.pardo(name ?? "\(_file):\(_line)",ParameterizedClosureFn(param,fn),[AnyPCollection(output.0),AnyPCollection(output.1),AnyPCollection(output.2)])) return output } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift index 642e76548ad..3ef57ebfd35 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -18,8 +18,8 @@ /// Basic reducers public extension PCollection { - func reduce<Result:Codable,K,V>(name:String = "\(#file):\(#line)",into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> { - return pardo(name,into) { initialValue,input,output in + func reduce<Result:Codable,K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,into:Result,_ accumulator: @Sendable @escaping (V,inout Result) -> Void) -> PCollection<KV<K,Result>> where Of == KV<K,V> { + return pardo(name,_file:_file,_line:_line,into) { initialValue,input,output in for await (kv,ts,w) in input { var result = initialValue for v in kv.values { @@ -34,7 +34,7 @@ public extension PCollection { /// Convenience functions public extension PCollection { - func sum<K,V:Numeric&Codable>() -> PCollection<KV<K,V>> where Of == KV<K,V> { - return reduce(into: 0,{ a,b in b = b + a }) + func sum<K,V:Numeric&Codable>(_ name:String? = nil,_file:String=#fileID,_line:Int=#line) -> PCollection<KV<K,V>> where Of == KV<K,V> { + return reduce(name:name,_file:_file,_line:_line,into: 0,{ a,b in b = b + a }) } } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift index ed98a8577e6..8cf2b46cfea 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Grouping.swift @@ -19,8 +19,8 @@ /// Basic grouping functionality /// public extension PCollection { - func groupBy<K,V>(name: String = "\(#file):\(#line)",_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { - return map(name:name,fn) + func groupBy<K,V>(name:String? = nil,_file:String=#fileID,_line:Int=#line,_ fn: @Sendable @escaping (Of) -> (K,V)) -> PCollection<KV<K,V>> { + return map(name,_file:_file,_line:_line,fn) .groupByKey() } }
