This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0bfcf41f45520f4382314e05c97af123ca549201 Author: Byron Ellis <[email protected]> AuthorDate: Wed Aug 23 18:55:18 2023 -0700 Added the primitives for a FileIO implementation with Google Storage support to start. Doesn't do anything fancy, but does implement a true version of wordcount with the classic Shakespeare input. --- sdks/swift/Package.resolved | 9 ++ sdks/swift/Package.swift | 10 +- .../Core/PCollection/AnyPCollection.swift | 8 +- .../Core/PCollection/AnyPCollectionStream.swift | 6 +- .../ApacheBeam/Core/PCollection/PCollection.swift | 15 +++ .../Core/PCollection/PCollectionTest.swift | 43 ++++++++ .../PTransform/{Outputs.swift => Output.swift} | 2 +- .../ApacheBeam/Core/PTransform/Transform.swift | 7 ++ .../ApacheBeam/Core/Pipeline/Pipeline.swift | 4 + .../Core/Pipeline/PipelineTransform.swift | 24 ++++- .../Schema/{DynamicRow.swift => Row.swift} | 9 +- sdks/swift/Sources/ApacheBeam/Schema/Schema.swift | 1 + .../Sources/ApacheBeam/Transforms/IO/FileIO.swift | 17 ++++ .../Transforms/IO/GoogleCloud/GoogleStorage.swift | 113 +++++++++++++++++++++ .../ApacheBeam/Transforms/IO/ListFiles.swift | 43 -------- .../Pipeline/CompositeIntegrationTests.swift | 1 + .../ApacheBeamTests/Pipeline/FileIOTests.swift | 83 +++++++++++++++ .../Pipeline/IntegrationTests.swift | 1 + 18 files changed, 342 insertions(+), 54 deletions(-) diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved index 1a12c374c0c..6aa79883fc0 100644 --- a/sdks/swift/Package.resolved +++ b/sdks/swift/Package.resolved @@ -63,6 +63,15 @@ "version" : "1.19.0" } }, + { + "identity" : "pythonkit", + "kind" : "remoteSourceControl", + "location" : "https://github.com/pvieito/PythonKit.git", + "state" : { + "branch" : "master", + "revision" : "060e1c8b0d14e4d241b3623fdbe83d0e3c81a993" + } + }, { "identity" : "smithy-swift", "kind" : "remoteSourceControl", diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift index 54cf8c5d88c..b25688b98b6 100644 --- a/sdks/swift/Package.swift +++ b/sdks/swift/Package.swift @@ -30,8 +30,8 @@ let dependencies: [Package.Dependency] = [ .package(url: "https://github.com/awslabs/aws-sdk-swift.git", from: "0.23.0"), .package(url: "https://github.com/googleapis/google-auth-library-swift",from:"0.0.0"), .package(url: "https://github.com/duckdb/duckdb-swift", .upToNextMinor(from: .init(0, 8, 0))), - - + .package(url: "https://github.com/pvieito/PythonKit.git", branch: "master"), + // Swift Package Manager Plugins .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0") ] @@ -54,10 +54,12 @@ let package = Package( .target( name: "ApacheBeam", dependencies: [ - .product(name:"GRPC",package:"grpc-swift"), + .product(name: "GRPC",package:"grpc-swift"), .product(name: "Logging",package:"swift-log"), .product(name: "AWSS3",package:"aws-sdk-swift"), - .product(name: "DuckDB", package: "duckdb-swift") + .product(name: "DuckDB", package: "duckdb-swift"), + .product(name: "PythonKit", package:"PythonKit"), + .product(name: "OAuth2", package:"google-auth-library-swift") ] ), .testTarget( diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift index 0b67e6068d8..48c6b39c2b2 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift @@ -16,7 +16,7 @@ * limitations under the License. */ -public struct AnyPCollection : PCollectionProtocol { +public struct AnyPCollection : PCollectionProtocol,PipelineMember { @@ -29,6 +29,7 @@ public struct AnyPCollection : PCollectionProtocol { let consumersClosure: (Any) -> [PipelineTransform] let coderClosure: (Any) -> Coder let streamClosure: (Any) -> AnyPCollectionStream + let rootsClosure: (Any) -> [PCollection<Never>] public init<C>(_ collection: C) where C : PCollectionProtocol { if let anyCollection = collection as? AnyPCollection { @@ -43,6 +44,7 @@ public struct AnyPCollection : PCollectionProtocol { self.coderClosure = { ($0 as! C).coder } self.streamClosure = { AnyPCollectionStream(($0 as! C).stream) } self.parentClosure = { ($0 as! C).parent } + self.rootsClosure = { ($0 as! PipelineMember).roots } } } @@ -73,6 +75,10 @@ public struct AnyPCollection : PCollectionProtocol { streamClosure(collection) } + var roots: [PCollection<Never>] { + rootsClosure(collection) + } + } extension AnyPCollection : Hashable { diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift index 4e5ea0bfb0c..959d9a04217 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift @@ -53,9 +53,11 @@ public struct AnyPCollectionStream : AsyncSequence { try stream.emit(beamValue) } else if let element = $1 as? Element { stream.emit((element.0 as! Of,element.1,element.2)) - } else if let element = $1 as? PCollectionStream<Of>.Element { + } else if let element = $1 as? Of { stream.emit(element) - } + } else { + throw ApacheBeamError.runtimeError("Unable to send \($1) to \(stream)") + } } self.finishClosure = { diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift index b8cf712be13..9918ed69ace 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollection.swift @@ -59,3 +59,18 @@ public final class PCollection<Of> : PCollectionProtocol { } + +extension PCollection : PipelineMember { + var roots: [PCollection<Never>] { + if let p = parent { + return p.roots + } else if let p = self as? PCollection<Never> { + return [p] + } else { + return [] + } + } + + +} + diff --git a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift new file mode 100644 index 00000000000..221ea8bdb0a --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionTest.swift @@ -0,0 +1,43 @@ +import Logging + +/// Test harness for PCollections. Primarily designed for unit testing. +public struct PCollectionTest { + let output: AnyPCollection + let fn: (Logger,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void + + public init<Of>(_ collection: PCollection<Of>,_ fn: @escaping (Logger,[AnyPCollectionStream],[AnyPCollectionStream]) async throws -> Void) { + self.output = AnyPCollection(collection) + self.fn = fn + } + + public func run() async throws { + let log = Logger(label:"Test") + if let transform = output.parent { + switch transform { + case let .pardo(parent,_,fn,outputs): + let context = SerializableFnBundleContext(instruction:"1",transform:"test",payload:try fn.payload,log: log) + let input = parent.anyStream + let streams = outputs.map({$0.anyStream}) + + try await withThrowingTaskGroup(of: Void.self) { group in + log.info("Starting process task") + group.addTask { + _ = try await fn.process(context: context, inputs:[input], outputs: streams) + } + log.info("Calling Stream") + group.addTask { + try await self.fn(log,[input],streams) + } + for try await _ in group { + } + } + default: + throw ApacheBeamError.runtimeError("Not able to test this type of transform \(transform)") + } + } else { + throw ApacheBeamError.runtimeError("Unable to determine parent transform to test") + } + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift similarity index 85% rename from sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift rename to sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift index a0c246a438a..89c2d439ac9 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Outputs.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Output.swift @@ -3,7 +3,7 @@ public struct NamedCollectionPTransform<Of> : _PrimitivePTransform { let collection: PCollection<Of> } -/// Captures a single pcollection and gives it a name +/// Captures a PCollection and gives it a name so it can be used as an output public struct Output<Of> : PTransform { let name: String let fn: () -> PCollection<Of> diff --git a/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift new file mode 100644 index 00000000000..f5cbc70570f --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/PTransform/Transform.swift @@ -0,0 +1,7 @@ +/// Groups transforms together. Does not expose any of the pcollections are accessible outputs +public struct Transform<Subtransform> { + let subtransform:Subtransform + public init(@PTransformBuilder subtransform: () -> Subtransform) { + self.subtransform = subtransform() + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index b9cab1376a5..825680cfa4c 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -19,6 +19,10 @@ import GRPC import Logging +protocol PipelineMember { + var roots: [PCollection<Never>] { get } +} + public final class Pipeline { let content: (inout PCollection<Never>) -> Void let log: Logging.Logger diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift index 1ce5c5f6bdc..d7fca814b68 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineTransform.swift @@ -30,5 +30,27 @@ public enum PipelineTransform { case external(AnyPCollection,[AnyPCollection]) } - +extension PipelineTransform : PipelineMember { + var roots: [PCollection<Never>] { + switch self { + + case .pardo(let p, _, _, _): + return p.roots + case .impulse(let p, _): + return p.roots + case .flatten(let p, _): + return p.flatMap({ $0.roots }) + case .groupByKey(let p, _): + return p.roots + case .custom(let p, _, _, _, _): + return p.roots + case .composite(let p, _): + return p.roots + case .external(let p, _): + return p.roots + } + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift b/sdks/swift/Sources/ApacheBeam/Schema/Row.swift similarity index 92% rename from sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift rename to sdks/swift/Sources/ApacheBeam/Schema/Row.swift index b0a4054d2e5..f0991a94cfe 100644 --- a/sdks/swift/Sources/ApacheBeam/Schema/DynamicRow.swift +++ b/sdks/swift/Sources/ApacheBeam/Schema/Row.swift @@ -16,8 +16,13 @@ * limitations under the License. */ +import Foundation + /// Dynamic representation of a Row that lets us treat a row value like JSON. Obviously this is less performant and /// when using schema objects internally, particularly in PCollections we would favor @Row structs -public struct DynamicRow { - +public struct Row { + let data: Data + let schema: Schema } + + diff --git a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift index 1404f5e8efd..489c8ac98bb 100644 --- a/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift +++ b/sdks/swift/Sources/ApacheBeam/Schema/Schema.swift @@ -13,6 +13,7 @@ public indirect enum FieldType { public struct Field { let name: String + let description: String? let type: FieldType } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift new file mode 100644 index 00000000000..fba04c58627 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/FileIO.swift @@ -0,0 +1,17 @@ +import Foundation + +public protocol FileIOSource { + static func readFiles(matching:PCollection<KV<String,String>>) -> PCollection<Data> + static func listFiles(matching:PCollection<KV<String,String>>) -> PCollection<KV<String,String>> +} + +public extension PCollection<KV<String,String>> { + func readFiles<Source:FileIOSource>(in source:Source.Type) -> PCollection<Data> { + Source.readFiles(matching:self) + } + + /// Takes a KV pair of (bucket,prefix) and returns a list of (bucket,filename) + func listFiles<Source:FileIOSource>(in source:Source.Type) -> PCollection<KV<String,String>> { + Source.listFiles(matching:self) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift new file mode 100644 index 00000000000..444f8e40320 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Transforms/IO/GoogleCloud/GoogleStorage.swift @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import OAuth2 +import Foundation +import Dispatch + +struct ListFilesResponse: Codable { + struct Item : Codable { + let kind: String + let selfLink: String + let mediaLink: String + let name: String + let bucket: String + let size: String + } + + let kind: String + let items: [Item] +} + +public struct GoogleStorage : FileIOSource { + public static func readFiles(matching: PCollection<KV<String,String>>) -> PCollection<Data> { + matching.pardo { matching,output in + guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.get"]) else { + throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.") + } + let connection = Connection(provider: tokenProvider) + for await (file,_,_) in matching { + let bucket = file.key + for name in file.values { + let url = "https://storage.googleapis.com/storage/v1/b/\(bucket)/o/\(name.addingPercentEncoding(withAllowedCharacters: .urlHostAllowed)!)" + let response:Data? = try await withCheckedThrowingContinuation { continuation in + do { + try connection.performRequest(method: "GET", + urlString: url , + parameters: ["alt":"media"],body:nil) { + data,response,error in + if let e = error { + continuation.resume(throwing: e) + } else { + continuation.resume(returning: data) + } + } + } catch { + continuation.resume(throwing: error) + } + } + if let d = response { + output.emit(d) + } + } + } + } + } + + public static func listFiles(matching: PCollection<KV<String,String>>) -> PCollection<KV<String,String>> { + matching.pardo { matching,output in + + guard let tokenProvider = DefaultTokenProvider(scopes: ["storage.objects.list"]) else { + throw ApacheBeamError.runtimeError("Unable to get OAuth2 token.") + } + let connection = Connection(provider: tokenProvider) + for await (match,_,_) in matching { + let bucket = match.key + for prefix in match.values { + let response:Data? = try await withCheckedThrowingContinuation { continuation in + do { + try connection.performRequest( + method: "GET", + urlString: "https://storage.googleapis.com/storage/v1/b/\(bucket)/o", + parameters:["prefix": prefix], + body:nil) { data,response,error in + if let e = error { + continuation.resume(throwing: e) + } else { + continuation.resume(returning: data) + } + } + } catch { + continuation.resume(throwing: error) + } + } + if let data = response { + let listfiles = try JSONDecoder().decode(ListFilesResponse.self, from: data) + for item in listfiles.items { + if item.size != "0" { + output.emit(KV(item.bucket,item.name)) + } + } + } + } + } + } + } + + +} diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift b/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift deleted file mode 100644 index 2ebb7f852d3..00000000000 --- a/sdks/swift/Sources/ApacheBeam/Transforms/IO/ListFiles.swift +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -struct S3Bucket : Codable { - let bucket:String -} - -struct GSBucket : Codable { - let bucket:String -} - -public extension PCollection { - - func listFiles(s3 bucket:String) { - - } - - func listFiles(gs bucket:String) { - } - -} - -public func listFiles(s3 bucket: String) { -} - -public func listFiles(gs bucket: String) { - -} diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift index f2ee142a966..bddf3abc47d 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/CompositeIntegrationTests.swift @@ -66,6 +66,7 @@ final class CompositeIntegrationTests: XCTestCase { } func testCompositeWordCount() async throws { + throw XCTSkip() try await Pipeline { FixtureWordCount(fixtures: ["file1.txt","file2.txt","missing.txt"]) }.run(PortableRunner(loopback:true)) diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift new file mode 100644 index 00000000000..1806de4c098 --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift @@ -0,0 +1,83 @@ +// +// FileIOTests.swift +// +// +// Created by Byron Ellis on 8/22/23. +// +import ApacheBeam +import Logging +import XCTest + +final class FileIOTests: XCTestCase { + + override func setUpWithError() throws { + } + + override func tearDownWithError() throws { + } + + func testGoogleStorageListFiles() async throws { + throw XCTSkip() + try await PCollectionTest(PCollection<KV<String,String>>().listFiles(in: GoogleStorage.self)) { log,inputs,outputs in + log.info("Sending value") + try inputs[0].emit(value:KV("dataflow-samples","shakespeare")) + log.info("Value sent") + inputs[0].finish() + for try await (output,_,_) in outputs[0] { + log.info("Output: \(output)") + } + }.run() + } + + func testGoogleStorageReadFiles() async throws { + try await PCollectionTest(PCollection<KV<String,String>>().readFiles(in: GoogleStorage.self)) { log,inputs,outputs in + throw XCTSkip() + log.info("Sending value") + try inputs[0].emit(value:KV("dataflow-samples","shakespeare/asyoulikeit.txt")) + log.info("Value sent") + inputs[0].finish() + for try await (output,_,_) in outputs[0] { + log.info("Output: \(String(data:output as! Data,encoding:.utf8)!)") + } + }.run() + } + + func testShakespeareWordcount() async throws { + try await Pipeline { pipeline in + let contents = pipeline + .create(["dataflow-samples/shakespeare"]) + .map({ value in + let parts = value.split(separator: "/",maxSplits: 1) + return KV(parts[0].lowercased(),parts[1].lowercased()) + }) + .listFiles(in: GoogleStorage.self) + .readFiles(in: GoogleStorage.self) + + // Simple ParDo that takes advantage of enumerateLines. No name to test name generation of pardos + let lines = contents.pardo { contents,lines in + for await (content,_,_) in contents { + String(data:content,encoding:.utf8)!.enumerateLines { line,_ in + lines.emit(line) + } + } + } + + // Our first group by operation + let baseCount = lines + .flatMap({ $0.components(separatedBy: .whitespaces) }) + .groupBy({ ($0,1) }) + .sum() + .log(prefix:"INTERMEDIATE OUTPUT") + + let normalizedCounts = baseCount.groupBy { + ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters), + $0.value ?? 1) + }.sum() + + normalizedCounts.log(prefix:"COUNT OUTPUT") + + }.run(PortableRunner(loopback:true)) + } + + +} diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index 0c34fcb42ae..b86b941cd7f 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -45,6 +45,7 @@ final class IntegrationTests: XCTestCase { } func testPortableWordcount() async throws { + throw XCTSkip() try await Pipeline { pipeline in let (contents,errors) = pipeline .create(["file1.txt","file2.txt","missing.txt"])
