This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 534b3e4c2c9b50c9ff9e1d2f7c096ec55cdda3a3 Author: Byron Ellis <[email protected]> AuthorDate: Tue Aug 15 10:07:59 2023 -0700 Moving some stuff around to make the proto construction code easier to read --- sdks/swift/README.md | 19 ++ .../Sources/ApacheBeam/Core/Environment.swift | 40 ---- .../ApacheBeam/Core/Fn/SerializableFn.swift | 2 + sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift | 6 +- .../Core/PCollection/AnyPCollection.swift | 9 + .../ApacheBeam/Core/Pipeline/Pipeline.swift | 245 +++++++++++++++++++++ .../ApacheBeam/Core/Pipeline/PipelineContext.swift | 11 + .../Core/Pipeline/PipelineTransform.swift | 2 +- sdks/swift/Sources/ApacheBeam/Core/Windowing.swift | 4 - .../ApacheBeam/Internal/Array+Helpers.swift | 5 + .../ApacheBeam/Internal/PipelineProto+Coder.swift | 28 +++ .../Internal/PipelineProto+Environment.swift | 38 ++++ .../Sources/ApacheBeam/Transforms/Basic.swift | 12 + .../Pipeline/IntegrationTests.swift | 9 +- 14 files changed, 381 insertions(+), 49 deletions(-) diff --git a/sdks/swift/README.md b/sdks/swift/README.md new file mode 100644 index 00000000000..810ccae8b24 --- /dev/null +++ b/sdks/swift/README.md @@ -0,0 +1,19 @@ +# Swift SDK for Beam + +The Swift SDK for Beam is a "portable" Beam implementation written in native Swift. + +## Usage + +To use the Swift SDK for Beam you should add it to your own executable package as a dependency: +``` +let package = Package( + dependencies:[ + .package(url:"https://github.com/apache/beam/sdks/swift, from: "2.51.0") + ], + targets:[ + // targets + ] +) +``` + + diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift index f3b0fb85cbc..fd09bf454e2 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift @@ -42,43 +42,3 @@ public struct Environment { } } - -extension Environment : ProtoConversion { - typealias Proto = EnvironmentProto - - var proto: EnvironmentProto { - get throws { - return try .with { p in - p.urn = switch category { - case .docker(_): .beamUrn("docker",type:"env") - case .system: .beamUrn("default",type:"env") - case .process(_, _, _, _): .beamUrn("process",type:"env") - case .external(_): .beamUrn("external",type:"env") - } - p.capabilities = capabilities - p.dependencies = dependencies.map({ $0.proto }) - - if case let .docker(containerImage) = category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with { - $0.containerImage = containerImage - }.serializedData() - } - - if case let .external(endpoint) = category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with { - $0.endpoint = endpoint.proto - }.serializedData() - } - - if case let .process(command,arch,os,env) = category { - p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with { - $0.arch = arch - $0.command = command - $0.os = os - $0.env = env - }.serializedData() - } - } - } - } -} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift index babf64dfe31..717b604deb1 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift @@ -28,11 +28,13 @@ public struct SerializableFnBundleContext { /// SerialiableFn is a protocol for functions that should be parameterized for the pipeline. This is intended as a fairly low level class and users /// should interact with the apply() functions defined in the transform section or implement the DoFn protocol which is then wrapped public protocol SerializableFn { + var urn: String { get } var payload: Data { get throws } func process(context:SerializableFnBundleContext,inputs:[AnyPCollectionStream],outputs:[AnyPCollectionStream]) async throws -> (String,String) } /// Provide some defaults where our function doesn't have any payload public extension SerializableFn { + var urn: String { .beamUrn("dofn",type:"swift:transform") } var payload : Data { Data() } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift index f8cd8f735c0..278ccb609cb 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift @@ -59,4 +59,8 @@ public struct KV<Key,Value> : AnyKeyValue { public var anyValue: Any? { value } } - +extension KV : Beamable { + public static var coder: Coder { + return .keyvalue(.of(type: Key.self)!, .of(type: Value.self)!) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift index 8f23dccb954..9d2aa8e8c9f 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift @@ -49,3 +49,12 @@ public struct AnyPCollection : PCollectionProtocol { } } + +extension AnyPCollection : Hashable { + public func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self.collection as AnyObject)) + } + public static func ==(lhs: AnyPCollection,rhs:AnyPCollection) -> Bool { + return ObjectIdentifier(lhs.collection as AnyObject) == ObjectIdentifier(rhs.collection as AnyObject) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index f918a620ea1..ece88ed6799 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -10,5 +10,250 @@ public final class Pipeline { self.content = content } + public func run(loopback:Bool = false) throws { + let job = try context + log.info("\(job.proto)") + + var proto = job.proto + if loopback { + //If we are in loopback mode we want to replace the default environment + } + + } + + + /// For managing the pipeline items to visit + enum Visit { + case transform([PipelineComponent],PipelineTransform) + case collection(AnyPCollection) + } + var context: PipelineContext { + get throws { + // Grab the pipeline content using an new root + var root = PCollection<Never>(coder:.unknown(.coderUrn("never"))) + _ = content(&root) + + // These get passed to the pipeline context + var collections: [String:AnyPCollection] = [:] + var fns: [String:SerializableFn] = [:] + var coders: [String:Coder] = [:] + var counter: Int = 1 + + // These caches are just used internally + var collectionCache: [AnyPCollection:PipelineComponent] = [:] + var coderCache: [Coder:PipelineComponent] = [:] + var rootIds: [String] = [] + + var defaultEnvironment: PipelineComponent = .none + + //TODO: Support for composite PTransforms + let pipeline: PipelineProto = try .with { proto in + + func uniqueName(_ prefix: String = "id") -> String { + let output = "\(prefix)\(counter)" + counter = counter + 1 + return output + } + + + + /// We need to define this inside the with to prevent concurrent access errors. + func coder(from:Coder) -> PipelineComponent { + if let cached = coderCache[from] { + return cached + } + let componentCoders:[String] = switch from { + case let .keyvalue(keyCoder, valueCoder): + [coder(from:keyCoder).name,coder(from:valueCoder).name] + case let .iterable(valueCoder): + [coder(from:valueCoder).name] + case let .lengthprefix(valueCoder): + [coder(from:valueCoder).name] + case let .windowedvalue(valueCoder, windowCoder): + [coder(from:valueCoder).name,coder(from:windowCoder).name] + default: + [] + } + let baseCoder = proto.coder { _ in + .with { + $0.spec = .with { + $0.urn = from.urn + if case .custom(let data) = from { + $0.payload = data + } + } + $0.componentCoderIds = componentCoders + } + } + coderCache[from] = baseCoder + coders[baseCoder.name] = from + return baseCoder + } + + /// Define the default environment for this pipeline + defaultEnvironment = try proto.environment(from:.init(.docker("swift:image"), + capabilities:Coder.capabilities, + dependencies:[])) + + /// Define the default strategy + let globalWindow = coder(from:.globalwindow) + let defaultStrategy = proto.windowingStrategy { _ in + .with { + $0.windowCoderID = globalWindow.name + $0.windowFn = .with { + $0.urn = .beamUrn("global_windows",type:"window_fn") + } + $0.mergeStatus = .nonMerging + $0.trigger = .with { + $0.default = .init() + } + $0.accumulationMode = .discarding + $0.outputTime = .endOfWindow + $0.closingBehavior = .emitIfNonempty + $0.onTimeBehavior = .fireIfNonempty + $0.environmentID = defaultEnvironment.name + } + } + + + /// As above we define this within the "with" to prevent concurrent access errors. + func collection(from collection:AnyPCollection) -> PipelineComponent { + if let cached = collectionCache[collection] { + return cached + } + let coder = coder(from:collection.coder) + let output = proto.collection { _ in + .with { + $0.uniqueName = uniqueName("c") + $0.coderID = coder.name + $0.isBounded = .bounded //TODO: Get this from the coder + } + } + collectionCache[collection] = output + collections[output.name] = collection + return output + } + + func transform(name:String = "",_ fn: @escaping (String,String) throws -> PTransformProto) throws -> PipelineComponent { + return try proto.transform { ref in + return try fn(ref,name.count > 0 ? name+uniqueName(".t") : uniqueName("t")) + } + } + + var toVisit: [Visit] = root.consumers.map({ .transform([], $0)}) + var visited = Set<AnyPCollection>() // Cycle detection, etc + + while toVisit.count > 0 { + let item = toVisit.removeFirst() + if case .transform(let parents, let pipelineTransform) = item { + let inputs = parents.enumerated().map({ ("\($0)","\($1.name)")}).dict() + switch pipelineTransform { + + case let .pardo(n, fn, o): + let outputs = o.enumerated().map { + ("\($0)",collection(from: $1).name) + }.dict() + let p = try transform(name:n) { _,name in + try .with { + $0.uniqueName = name + $0.inputs = inputs + $0.outputs = outputs + $0.spec = try .with { + $0.urn = .transformUrn("pardo") + $0.payload = try Org_Apache_Beam_Model_Pipeline_V1_ParDoPayload.with { + $0.doFn = try .with { + $0.urn = fn.urn + $0.payload = try fn.payload + + } + }.serializedData() + } + $0.environmentID = defaultEnvironment.name + } + } + rootIds.append(p.name) //TODO: Composite transform handling + fns[p.transform!.uniqueName] = fn + toVisit.append(contentsOf: o.map { .collection($0) }) + case .impulse(let o): + let outputs = [o].enumerated().map { + ("\($0)",collection(from: $1).name) + }.dict() + let p = try transform { _,name in + .with { + $0.uniqueName = name + $0.outputs = outputs + $0.spec = .with { + $0.urn = .transformUrn("impulse") + } + $0.environmentID = defaultEnvironment.name + } + } + rootIds.append(p.name) + toVisit.append(.collection(o)) + case .flatten(_, _): + throw ApacheBeamError.runtimeError("flatten not implemented yet") + case .groupByKey(let o): + let outputs = [o].enumerated().map { + ("\($0)",collection(from: $1).name) + }.dict() + let p = try transform { _,name in + .with { + $0.uniqueName = name + $0.inputs = inputs + $0.outputs = outputs + $0.spec = .with { + $0.urn = .transformUrn("group_by_key") + } + $0.environmentID = defaultEnvironment.name + } + } + rootIds.append(p.name) + toVisit.append(.collection(o)) + case let .custom(urn,payload,env,o): + let outputs = o.enumerated().map { + ("\($0)",collection(from: $1).name) + }.dict() + let environment = if let e = env { + try proto.environment(from: e) + } else { + defaultEnvironment + } + let p = try transform { _,name in + .with { + $0.uniqueName = name + $0.inputs = inputs + $0.outputs = outputs + $0.spec = .with { + $0.urn = urn + $0.payload = payload + } + $0.environmentID = environment.name + } + } + rootIds.append(p.name) + toVisit.append(contentsOf:o.map { .collection($0) }) + case .composite(_): + throw ApacheBeamError.runtimeError("Composite transforms are not yet implemented") + } + } else if case .collection(let anyPCollection) = item { + if visited.contains(anyPCollection) { + throw ApacheBeamError.runtimeError("Pipeline definition contains a cycle.") + } + visited.insert(anyPCollection) + //TODO: Remove this to see if we can recreate the error I was seeing earlier for robertwb + if anyPCollection.consumers.count > 0 { + let me = collection(from:anyPCollection) + toVisit.append(contentsOf: anyPCollection.consumers.map({ .transform([me], $0)})) + } + } + } + + + + + } + return PipelineContext(pipeline,defaultEnvironment.name) + } + } } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift new file mode 100644 index 00000000000..d944b86a7d6 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift @@ -0,0 +1,11 @@ +public final class PipelineContext { + + var proto : PipelineProto + let defaultEnvironmentId: String + + init(_ proto:PipelineProto,_ defaultEnvironmentId: String) { + self.proto = proto + self.defaultEnvironmentId = defaultEnvironmentId + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift index dea7daab738..7a54ecbc685 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift @@ -8,7 +8,7 @@ public enum PipelineTransform { case impulse(AnyPCollection) case flatten([AnyPCollection],AnyPCollection) case groupByKey(AnyPCollection) - case custom(String,Data,[AnyPCollection]) + case custom(String,Data,Environment?,[AnyPCollection]) case composite(AnyPTransform) } diff --git a/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift index 5727b6c3741..aeb71a39000 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift @@ -35,10 +35,6 @@ public enum Window { } } -public enum WindowingStrategy { - -} - public enum Timing : String { case early = "EARLY" case onTime = "ON_TIME" diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift b/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift new file mode 100644 index 00000000000..61c788c0387 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Array+Helpers.swift @@ -0,0 +1,5 @@ +extension Array where Element == (String,String) { + func dict() -> [String:String] { + reduce(into:[:],{ $0[$1.0] = $1.1} ) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift new file mode 100644 index 00000000000..2ad5c73cfdb --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Coder.swift @@ -0,0 +1,28 @@ +extension PipelineProto { + + mutating func coder(from: Coder) -> PipelineComponent { + let componentCoders:[String] = switch from { + case let .keyvalue(keyCoder, valueCoder): + [coder(from:keyCoder).name,coder(from:valueCoder).name] + case let .iterable(valueCoder): + [coder(from:valueCoder).name] + case let .lengthprefix(valueCoder): + [coder(from:valueCoder).name] + case let .windowedvalue(valueCoder, windowCoder): + [coder(from:valueCoder).name,coder(from:windowCoder).name] + default: + [] + } + return coder { _ in + .with { + $0.spec = .with { + $0.urn = from.urn + if case .custom(let data) = from { + $0.payload = data + } + } + $0.componentCoderIds = componentCoders + } + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift new file mode 100644 index 00000000000..5207748237f --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift @@ -0,0 +1,38 @@ +extension PipelineProto { + + mutating func environment(from: Environment) throws -> PipelineComponent { + return try environment { _ in + try .with { p in + p.urn = switch from.category { + case .docker(_): .beamUrn("docker",type:"env") + case .system: .beamUrn("default",type:"env") + case .process(_, _, _, _): .beamUrn("process",type:"env") + case .external(_): .beamUrn("external",type:"env") + } + p.capabilities = from.capabilities + p.dependencies = from.dependencies.map({ $0.proto }) + + if case let .docker(containerImage) = from.category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with { + $0.containerImage = containerImage + }.serializedData() + } + + if case let .external(endpoint) = from.category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with { + $0.endpoint = endpoint.proto + }.serializedData() + } + + if case let .process(command,arch,os,env) = from.category { + p.payload = try Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with { + $0.arch = arch + $0.command = command + $0.os = os + $0.env = env + }.serializedData() + } + } + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift index 52f2b28aef1..54c3ae4586e 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift @@ -24,6 +24,18 @@ public extension PCollection { } } } + + func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> PCollection<KV<K,V>> where Of == KV<K,V> { + pardo(name,prefix) { prefix,input,output in + for await element in input { + let kv = element.0 + for v in kv.values { + print("\(prefix): \(kv.key),\(v)") + } + output.emit(element) + } + } + } } /// Modifying Values diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index 3e2d6a3be7d..625ba1eafbd 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -34,7 +34,7 @@ final class IntegrationTests: XCTestCase { } func testPortableWordcount() throws { - _ = Pipeline { pipeline in + try Pipeline { pipeline in let (contents,errors) = pipeline .create(["file1.txt","file2.txt","missing.txt"]) .pardo { filenames,output,errors in @@ -67,9 +67,12 @@ final class IntegrationTests: XCTestCase { $0.value ?? 1) }.sum() + _ = normalizedCounts.log(prefix:"COUNT OUTPUT") + _ = errors.log(prefix:"ERROR OUTPUT") - - } + }.run() + + }
