This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9ebd42c7948b9240740b20581fb828cb13021d4c Author: Byron Ellis <[email protected]> AuthorDate: Tue Aug 15 10:09:50 2023 -0700 Add missing file --- .../Internal/PipelineProto+Initializers.swift | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift new file mode 100644 index 00000000000..eebfec33168 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Initializers.swift @@ -0,0 +1,75 @@ +enum PipelineComponent { + case none + case transform(String,PTransformProto) + case collection(String,PCollectionProto) + case coder(String,CoderProto) + case windowingStrategy(String,WindowingStrategyProto) + case environment(String,EnvironmentProto) + + var name: String { + switch self { + case .none: + fatalError("PipelineComponent not properly initialized") + case .transform(let n, _): + return n + case .collection(let n, _): + return n + case .coder(let n, _): + return n + case .windowingStrategy(let n, _): + return n + case .environment(let n, _): + return n + } + + } + + var transform: PTransformProto? { + if case .transform(_, let pTransformProto) = self { + return pTransformProto + } else { + return nil + } + } + +} + +/// Convenience function for creating new pipeline elements. Note that these shouldn't be accessed concurrently +/// but this isn't a problem itself since trying to access the proto concurrently throws an error. +extension PipelineProto { + mutating func transform(_ mapper: @escaping (String) throws -> PTransformProto) throws -> PipelineComponent { + let name = "ref_PTransform_\(self.components.transforms.count+1)" + let proto = try mapper(name) + self.components.transforms[name] = proto + return .transform(name,proto) + } + + mutating func collection(_ mapper: @escaping (String) -> PCollectionProto) -> PipelineComponent { + let name = "ref_PCollection_\(self.components.pcollections.count+1)" + let proto = mapper(name) + self.components.pcollections[name] = proto + return .collection(name, proto) + } + + mutating func coder(_ mapper: @escaping (String) -> CoderProto) -> PipelineComponent { + let name = "ref_Coder_\(self.components.coders.count+1)" + let proto = mapper(name) + self.components.coders[name] = proto + return .coder(name, proto) + } + + mutating func windowingStrategy(_ mapper: @escaping (String) -> WindowingStrategyProto) -> PipelineComponent { + let name = "ref_WindowingStrategy_\(self.components.coders.count+1)" + let proto = mapper(name) + self.components.windowingStrategies[name] = proto + return .windowingStrategy(name, proto) + } + + mutating func environment(_ mapper: @escaping (String) throws -> EnvironmentProto) throws -> PipelineComponent { + let name = "ref_Environment_\(self.components.coders.count+1)" + let proto = try mapper(name) + self.components.environments[name] = proto + return .environment(name, proto) + } + +}
