This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit c13e652d2a4051e9d846d9dd31cbd0fc3447bdd8 Author: Byron Ellis <[email protected]> AuthorDate: Fri Aug 25 13:41:43 2023 -0700 Changes to allow Swift SDK to operate successfully with the Flink Portable Runner as well as the Python Portable Runner. Modified the PInput/POutput functionality to just be structs, this allows us to use them for both closures as well as the eventual DoFn interface and cleans up the function signatures. --- .../Sources/ApacheBeam/Coders/Coder+Decoding.swift | 8 ++- .../Sources/ApacheBeam/Coders/Coder+Encoding.swift | 6 +- sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift | 29 ++++++++- .../ApacheBeam/Core/Pipeline/Pipeline.swift | 2 - .../ApacheBeam/Internal/Data+Decoding.swift | 7 ++- .../ApacheBeam/Internal/Data+Encoding.swift | 6 ++ .../ApacheBeam/Internal/Date+Timestamp.swift | 2 +- .../Runtime/Bundle/BundleProcessor.swift | 19 ++++-- .../Sources/ApacheBeam/Runtime/Bundle/Sink.swift | 1 + .../Sources/ApacheBeam/Runtime/Bundle/Source.swift | 14 ++++- .../ApacheBeam/Runtime/DataplaneClient.swift | 41 +++++++++++-- .../Sources/ApacheBeam/Runtime/Worker/Worker.swift | 11 +++- .../ApacheBeam/Transforms/BuiltIn+Elements.swift | 71 +++------------------- .../Sources/ApacheBeam/Transforms/Combining.swift | 3 +- .../ApacheBeamTests/Pipeline/FileIOTests.swift | 9 +-- .../Pipeline/IntegrationTests.swift | 2 +- 16 files changed, 137 insertions(+), 94 deletions(-) diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift index 9eef4113576..043b6690981 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift @@ -52,8 +52,10 @@ public extension Coder { let length = try data.next(Int32.self).byteSwapped return .array(try (0..<length).map({ _ in try coder.decode(&data) })) case let .windowedvalue(valueCoder, windowCoder): - let timestamp = try data.next(Int64.self).byteSwapped &+ Int64(-9223372036854775808) - let windowCount = try data.next(Int32.self).byteSwapped + // This will be big endian to match java + let timestamp = try data.instant() + + let windowCount = Int32(bigEndian: try data.next(Int32.self)) if windowCount > 1 { throw ApacheBeamError.runtimeError("Windowed values with > 1 window not yet supported") } @@ -72,7 +74,7 @@ public extension Coder { default: throw ApacheBeamError.runtimeError("Invalid pane encoding \(String(pane,radix:2))") } - return .windowed(try valueCoder.decode(&data), Date(millisecondsSince1970: timestamp), pane, window) + return .windowed(try valueCoder.decode(&data), timestamp, pane, window) default: throw ApacheBeamError.runtimeError("Decoding of \(self.urn) coders not supported.") } diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift index fffeb351a53..5076275dffc 100644 --- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift @@ -90,12 +90,12 @@ public extension Coder { case let .windowedvalue(valueCoder, windowCoder): if let (v,ts,w) = value as? (Any,Date,Window) { //Timestamp - data.next( (ts.millisecondsSince1970 &- Int64(-9223372036854775808)).bigEndian ) + data.instant(ts) switch w { case .global: - data.next(Int32(0)) + data.next(Int32(1).bigEndian) default: - data.next(Int32(1)) + data.next(Int32(1).bigEndian) try windowCoder.encode(w,data:&data) } // TODO: Real Panes diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift index 713b1de6796..28b809e5c53 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift @@ -16,7 +16,7 @@ * limitations under the License. */ -import Logging +import Foundation /// A higher level interface to SerializableFn using dependency injected dynamic properties in the same /// way as we define Composite PTransforms @@ -29,4 +29,31 @@ public extension DoFn { func finishBundle() async throws { } } +public struct PInput<Of> { + public let value: Of + public let timestamp: Date + public let window: Window + + public init(_ value: Of,_ timestamp: Date,_ window: Window) { + self.value = value + self.timestamp = timestamp + self.window = window + } + + public init(_ element: (Of,Date,Window)) { + self.value = element.0 + self.timestamp = element.1 + self.window = element.2 + } + +} +public struct POutput<Of> { + let stream: PCollectionStream<Of> + let timestamp: Date + let window: Window + + func emit(_ value: Of,timestamp: Date? = nil,window: Window? = nil) { + stream.emit(value,timestamp: timestamp ?? self.timestamp,window: window ?? self.window) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift index 825680cfa4c..34e404eca83 100644 --- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift +++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift @@ -211,7 +211,6 @@ public final class Pipeline { $0.spec = .with { $0.urn = .transformUrn("impulse") } - $0.environmentID = defaultEnvironment.name } } rootIds.append(p.name) @@ -232,7 +231,6 @@ public final class Pipeline { $0.spec = .with { $0.urn = .transformUrn("group_by_key") } - $0.environmentID = defaultEnvironment.name } } rootIds.append(p.name) diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift index 978898fb51f..6487063e0e3 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift @@ -68,6 +68,11 @@ extension Data { return result } + mutating func instant() throws -> Date { + let millis = Int64(bigEndian:try next(Int64.self)) &+ Int64(-9223372036854775808) + return Date(millisecondsSince1970: millis) + } + /// Read a length prefixed chunk of data mutating func subdata() throws -> Data { let length = try self.varint() @@ -80,7 +85,7 @@ extension Data { mutating func next<T>(_ type: T.Type) throws -> T { let size = MemoryLayout<T>.size let result = self.withUnsafeBytes { - $0.baseAddress!.withMemoryRebound(to: T.self, capacity: 1) { $0.pointee } + $0.load(as: T.self) } self = self.safeAdvance(by: size) return result diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift index 86007b21104..1a947162251 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift @@ -27,6 +27,12 @@ extension Data { self.append(UInt8(current)) } + mutating func instant(_ value: Date) { + Swift.withUnsafeBytes(of:(Int64(value.millisecondsSince1970) &- Int64(-9223372036854775808)).bigEndian) { + self.append(contentsOf: $0) + } + } + mutating func next<T>(_ value: T) { Swift.withUnsafeBytes(of:value) { self.append(contentsOf: $0) diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift index 16eb658d050..9c3424584ef 100644 --- a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift +++ b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift @@ -22,6 +22,6 @@ extension Date { Int64((self.timeIntervalSince1970 * 1000.0).rounded()) } init(millisecondsSince1970: Int64) { - self = Date(timeIntervalSince1970: TimeInterval(millisecondsSince1970) / 1000) + self = Date(timeIntervalSince1970: Double(millisecondsSince1970) / 1000.0) } } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift index 4e977d6f112..1e46b3e723d 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift @@ -37,7 +37,7 @@ struct BundleProcessor { descriptor:Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor, collections: [String:AnyPCollection], fns: [String:SerializableFn]) throws { - self.log = Logging.Logger(label: "BundleProcessor(\(descriptor.id))") + self.log = Logging.Logger(label: "BundleProcessor(\(id) \(descriptor.id))") var temp: [Step] = [] var coders = BundleCoderContainer(bundle:descriptor) @@ -60,7 +60,7 @@ struct BundleProcessor { - for (_,transform) in descriptor.transforms { + for (transformId,transform) in descriptor.transforms { let urn = transform.spec.urn //Map the input and output streams in the correct order let inputs = transform.inputs.sorted().map { streams[$0.1]! } @@ -69,8 +69,9 @@ struct BundleProcessor { if urn == "beam:runner:source:v1" { let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) let coder = try Coder.of(name: remotePort.coderID, in: coders) + log.info("Source '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)") temp.append(Step( - transformId: transform.uniqueName, + transformId: transform.uniqueName == "" ? transformId : transform.uniqueName, fn:Source(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder), inputs:inputs, outputs:outputs, @@ -79,8 +80,9 @@ struct BundleProcessor { } else if urn == "beam:runner:sink:v1" { let remotePort = try RemoteGrpcPort(serializedData: transform.spec.payload) let coder = try Coder.of(name: remotePort.coderID, in: coders) + log.info("Sink '\(transformId)','\(transform.uniqueName)' \(remotePort) \(coder)") temp.append(Step( - transformId: transform.uniqueName, + transformId: transform.uniqueName == "" ? transformId : transform.uniqueName, fn:Sink(client: try .client(for: ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), coder: coder), inputs:inputs, outputs:outputs, @@ -108,16 +110,23 @@ struct BundleProcessor { public func process(instruction: String,responder: AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation) async { _ = await withThrowingTaskGroup(of: (String,String).self) { group in + log.info("Starting bundle processing for \(instruction)") + var count: Int = 0 do { for step in self.steps { + log.info("Starting Task \(step.transformId)") let context = SerializableFnBundleContext(instruction: instruction, transform: step.transformId, payload: step.payload, log: self.log) group.addTask { return try await step.fn.process(context: context, inputs: step.inputs, outputs: step.outputs) } + count += 1 } + var finished: Int = 0 for try await (instruction,transform) in group { - log.info("Task Completed (\(instruction),\(transform))") + finished += 1 + log.info("Task Completed (\(instruction),\(transform)) \(finished) of \(count)") } + log.info("All tasks completed for \(instruction)") responder.yield(.with { $0.instructionID = instruction }) diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift index 19ad99c7d99..620a6758261 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift @@ -39,6 +39,7 @@ final class Sink : SerializableFn { } emitter.yield(.last(context.instruction, context.transform)) emitter.finish() + await client.finalizeStream(instruction: context.instruction, transform: context.transform) return (context.instruction,context.transform) } } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift index fd2888ad027..90d28057f48 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift @@ -17,25 +17,32 @@ */ import Foundation +import Logging /// Custom SerializableFn that reads/writes from an external data stream using a defined coder. It assumes that a given /// data element might contain more than one coder final class Source : SerializableFn { - + let client: DataplaneClient let coder: Coder + let log: Logger public init(client: DataplaneClient,coder:Coder) { self.client = client self.coder = coder - + self.log = Logger(label:"Source") } func process(context: SerializableFnBundleContext, inputs: [AnyPCollectionStream], outputs: [AnyPCollectionStream]) async throws -> (String, String) { + log.info("Waiting for input on \(context.instruction)-\(context.transform)") let (stream,_) = await client.makeStream(instruction: context.instruction, transform: context.transform) + + var messages: Int = 0 + var count: Int = 0 for await message in stream { + messages += 1 switch message { case let .data(data): var d = data @@ -43,6 +50,7 @@ final class Source : SerializableFn { let value = try coder.decode(&d) for output in outputs { try output.emit(value: value) + count += 1 } } case let .last(id, transform): @@ -50,9 +58,11 @@ final class Source : SerializableFn { output.finish() } await client.finalizeStream(instruction: id, transform: transform) + log.info("Source \(context.instruction),\(context.transform) handled \(count) items over \(messages) messages") return (id,transform) //TODO: Handle timer messages default: + log.info("Unhanled message \(message)") break } } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift index da820e0675b..254842e2d9d 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift @@ -67,7 +67,7 @@ actor DataplaneClient { private var streams: [Pair:(Stream,Stream.Continuation,MultiplexContinuation)] = [:] private let flush: Int - public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=100) throws { + public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=1000) throws { self.id = id self.log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))") self.multiplex = AsyncStream.makeStream(of:Multiplex.self) @@ -80,7 +80,9 @@ actor DataplaneClient { log.info("Initiating data plane multiplexing.") let input = multiplex.0 - + var count: Int = 0 + var flushes: Int = 0 + var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() for try await element in input { var shouldFlush: Bool = false @@ -92,6 +94,7 @@ actor DataplaneClient { $0.transformID = element.transform $0.data = payload }) + count += 1 case let .timer(family, payload): elements.timers.append(.with { $0.instructionID = element.id @@ -99,6 +102,7 @@ actor DataplaneClient { $0.timerFamilyID = family $0.timers = payload }) + count += 1 case let .last(id, transform): elements.data.append(.with { $0.instructionID = id @@ -106,19 +110,36 @@ actor DataplaneClient { $0.isLast = true }) shouldFlush = true + count += 1 case .flush: shouldFlush = true } if shouldFlush || elements.data.count + elements.timers.count >= flush { do { + if case .last = element.message { + log.info("Got last message, flushing \(elements.data.count + elements.timers.count) elements to data plane") + } try await stream.requestStream.send(elements) } catch { log.error("Unable to multiplex elements onto data plane: \(error)") } elements = Org_Apache_Beam_Model_FnExecution_V1_Elements() shouldFlush = false + flushes += 1 + } + if count % 50000 == 0 && count > 0 { + log.info("Processed \(count) elements (\(flushes) flushes)") } } + if(elements.data.count + elements.timers.count > 0) { + do { + log.info("Flushing final elements to data plane.") + try await stream.requestStream.send(elements) + } catch { + log.error("Unable to multiplex final elements onto data plane: \(error)") + } + } + log.info("Shutting down dataplane multiplexing") } // Demux task @@ -132,7 +153,6 @@ actor DataplaneClient { for element in elements.data { let key = Pair(id: element.instructionID, transform: element.transformID) - //Drop zero-length elements if element.data.count > 0 { messages[key,default:[]].append(.data(element.data)) } @@ -159,15 +179,20 @@ actor DataplaneClient { } } // Send any last messages + // TODO: Fix known race here. We try to re-use streams across bundles which can lead to a race where yield is sent too early. for (key,value) in last { let output = await self.makeStream(key: key).1 output.yield(value) } } } catch { - log.error("Lost data plane connection.") + log.error("Lost data plane connection. Closing all outstanding streams") + for (id,stream) in await streams { + log.info("Closing stream \(id)") + stream.1.finish() + } } - + multiplex.1.finish() } } @@ -192,7 +217,11 @@ actor DataplaneClient { } func finalizeStream(instruction:String,transform:String) { - //TODO: Implement finalization. + let key = Pair(id:instruction,transform:transform) + log.info("Done with stream \(key)") + if let element = streams.removeValue(forKey: key) { + element.1.finish() + } } diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift index 08204a11184..1a19f930f2c 100644 --- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift +++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift @@ -52,6 +52,7 @@ actor Worker { //Start the response task. This will continue until a yield call is sent from responder Task { for await r in responses { + log.info("Sending response \(r)") try await control.requestStream.send(r) } } @@ -75,8 +76,14 @@ actor Worker { for try await instruction in control.responseStream { switch instruction.request { case .processBundle(let pbr): - try await processor(for:pbr.processBundleDescriptorID) - .process(instruction: instruction.instructionID,responder:responder) + do { + let p = try await processor(for:pbr.processBundleDescriptorID) + Task { + await p.process(instruction: instruction.instructionID,responder:responder) + } + } catch { + log.error("Unable to process bundle \(pbr.processBundleDescriptorID): \(error)") + } break default: log.warning("Ignoring instruction \(instruction.instructionID). Not yet implemented.") diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift index e29ea085b7d..7febd84895a 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift @@ -18,87 +18,34 @@ import Foundation -public protocol PInput<Of> { - associatedtype Of - - var value: Of { get } - var timestamp: Date { get } - var window: Window { get } -} - -public protocol POutput<Of> { - associatedtype Of - - func emit(_ value: Of,timestamp: Date,window: Window) - func emit(_ value: Of) - func emit(_ value: Of,timestamp: Date) - func emit(_ value: Of,window: Window) -} - - -struct PardoInput<Of> : PInput { - let value: Of - let timestamp: Date - let window: Window - public init(_ value:(Of,Date,Window)) { - self.value = value.0 - self.timestamp = value.1 - self.window = value.2 - } -} - -struct PardoOutput<Of> : POutput { - - - let stream: PCollectionStream<Of> - let timestamp: Date - let window: Window - - func emit(_ value: Of, timestamp: Date, window: Window) { - stream.emit(value,timestamp:timestamp,window:window) - } - func emit(_ value: Of, timestamp: Date) { - stream.emit(value,timestamp:timestamp,window:window) - - } - func emit(_ value: Of, window: Window) { - stream.emit(value,timestamp:timestamp,window:window) - - } - func emit(_ value: Of) { - stream.emit(value,timestamp:timestamp,window:window) - } - - -} - public extension PCollection { // No Output - func pardo(name:String,_ fn: @Sendable @escaping (any PInput<Of>) async throws -> Void) { + func pardo(name:String,_ fn: @Sendable @escaping (PInput<Of>) async throws -> Void) { pstream(name:name) { input in for try await element in input { - try await fn(PardoInput(element)) + try await fn(PInput(element)) } } } // One Output - func pardo<O0>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>) async throws -> Void) -> PCollection<O0> { + func pardo<O0>(name:String,_ fn: @Sendable @escaping (PInput<Of>,POutput<O0>) async throws -> Void) -> PCollection<O0> { pstream(name:name) { input,output in for try await element in input { - try await fn(PardoInput(element),PardoOutput(stream:output,timestamp:element.1,window:element.2)) + try await fn(PInput(element), + POutput(stream:output,timestamp:element.1,window:element.2)) } } } //Two Outputs - func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any POutput<O0>,any POutput<O1>) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { + func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (PInput<Of>,POutput<O0>,POutput<O1>) async throws -> Void) -> (PCollection<O0>,PCollection<O1>) { pstream(name:name) { input,o0,o1 in for try await element in input { - try await fn(PardoInput(element), - PardoOutput(stream:o0,timestamp:element.1,window:element.2), - PardoOutput(stream:o1,timestamp:element.1,window:element.2) + try await fn(PInput(element), + POutput(stream:o0,timestamp:element.1,window:element.2), + POutput(stream:o1,timestamp:element.1,window:element.2) ) } diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift index 7abd847b8b5..a03f3e6ec11 100644 --- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift +++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift @@ -25,7 +25,8 @@ public extension PCollection { for v in kv.values { accumulator(v,&result) } - output.emit(KV(kv.key,result),timestamp:ts,window:w) + let intermediate = KV(kv.key,result) + output.emit(intermediate,timestamp:ts,window:w) } } diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift index b4ffa125b08..142f264bd99 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift @@ -30,8 +30,8 @@ final class FileIOTests: XCTestCase { } func testGoogleStorageReadFiles() async throws { + throw XCTSkip() try await PCollectionTest(PCollection<KV<String,String>>().readFiles(in: GoogleStorage.self)) { log,inputs,outputs in - throw XCTSkip() log.info("Sending value") try inputs[0].emit(value:KV("dataflow-samples","shakespeare/asyoulikeit.txt")) log.info("Value sent") @@ -43,11 +43,13 @@ final class FileIOTests: XCTestCase { } func testShakespeareWordcount() async throws { + //throw XCTSkip() try await Pipeline { pipeline in let contents = pipeline .create(["dataflow-samples/shakespeare"]) .map({ value in let parts = value.split(separator: "/",maxSplits: 1) + print("Got filename \(parts) from \(value)") return KV(parts[0].lowercased(),parts[1].lowercased()) }) .listFiles(in: GoogleStorage.self) @@ -64,10 +66,9 @@ final class FileIOTests: XCTestCase { // Our first group by operation let baseCount = lines - .flatMap({ $0.components(separatedBy: .whitespaces) }) + .flatMap({ (line:String) in line.components(separatedBy: .whitespaces) }) .groupBy({ ($0,1) }) .sum() - .log(prefix:"INTERMEDIATE OUTPUT") let normalizedCounts = baseCount.groupBy { ($0.key.lowercased().trimmingCharacters(in: .punctuationCharacters), @@ -76,7 +77,7 @@ final class FileIOTests: XCTestCase { normalizedCounts.log(prefix:"COUNT OUTPUT") - }.run(PortableRunner(loopback:true)) + }.run(PortableRunner(port:8099,loopback:true)) } diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift index 27e5972c1e7..0ea5d962d92 100644 --- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift +++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift @@ -83,7 +83,7 @@ final class IntegrationTests: XCTestCase { normalizedCounts.log(prefix:"COUNT OUTPUT") errors.log(prefix:"ERROR OUTPUT") - }.run(PortableRunner(loopback:true)) + }.run(PortableRunner(port:8099,loopback:true)) }
