This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c13e652d2a4051e9d846d9dd31cbd0fc3447bdd8
Author: Byron Ellis <[email protected]>
AuthorDate: Fri Aug 25 13:41:43 2023 -0700

    Changes to allow Swift SDK to operate successfully with the Flink Portable 
Runner as well as the Python Portable Runner. Modified the PInput/POutput 
functionality to just be structs, this allows us to use them for both closures 
as well as the eventual DoFn interface and cleans up the function signatures.
---
 .../Sources/ApacheBeam/Coders/Coder+Decoding.swift |  8 ++-
 .../Sources/ApacheBeam/Coders/Coder+Encoding.swift |  6 +-
 sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift   | 29 ++++++++-
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        |  2 -
 .../ApacheBeam/Internal/Data+Decoding.swift        |  7 ++-
 .../ApacheBeam/Internal/Data+Encoding.swift        |  6 ++
 .../ApacheBeam/Internal/Date+Timestamp.swift       |  2 +-
 .../Runtime/Bundle/BundleProcessor.swift           | 19 ++++--
 .../Sources/ApacheBeam/Runtime/Bundle/Sink.swift   |  1 +
 .../Sources/ApacheBeam/Runtime/Bundle/Source.swift | 14 ++++-
 .../ApacheBeam/Runtime/DataplaneClient.swift       | 41 +++++++++++--
 .../Sources/ApacheBeam/Runtime/Worker/Worker.swift | 11 +++-
 .../ApacheBeam/Transforms/BuiltIn+Elements.swift   | 71 +++-------------------
 .../Sources/ApacheBeam/Transforms/Combining.swift  |  3 +-
 .../ApacheBeamTests/Pipeline/FileIOTests.swift     |  9 +--
 .../Pipeline/IntegrationTests.swift                |  2 +-
 16 files changed, 137 insertions(+), 94 deletions(-)

diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
index 9eef4113576..043b6690981 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
@@ -52,8 +52,10 @@ public extension Coder {
             let length = try data.next(Int32.self).byteSwapped
             return .array(try (0..<length).map({ _ in try coder.decode(&data) 
}))
         case let .windowedvalue(valueCoder, windowCoder):
-            let timestamp = try data.next(Int64.self).byteSwapped &+ 
Int64(-9223372036854775808)
-            let windowCount = try data.next(Int32.self).byteSwapped
+            // This will be big endian to match java
+            let timestamp = try data.instant()
+
+            let windowCount = Int32(bigEndian: try data.next(Int32.self))
             if windowCount > 1 {
                 throw ApacheBeamError.runtimeError("Windowed values with > 1 
window not yet supported")
             }
@@ -72,7 +74,7 @@ public extension Coder {
             default:
                 throw ApacheBeamError.runtimeError("Invalid pane encoding 
\(String(pane,radix:2))")
             }
-            return .windowed(try valueCoder.decode(&data), 
Date(millisecondsSince1970: timestamp), pane, window)
+            return .windowed(try valueCoder.decode(&data), timestamp, pane, 
window)
         default:
             throw ApacheBeamError.runtimeError("Decoding of \(self.urn) coders 
not supported.")
         }
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
index fffeb351a53..5076275dffc 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift
@@ -90,12 +90,12 @@ public extension Coder {
         case let .windowedvalue(valueCoder, windowCoder):
             if let (v,ts,w) = value as? (Any,Date,Window) {
                 //Timestamp
-                data.next( (ts.millisecondsSince1970 &- 
Int64(-9223372036854775808)).bigEndian )
+                data.instant(ts)
                 switch w {
                 case .global:
-                    data.next(Int32(0))
+                    data.next(Int32(1).bigEndian)
                 default:
-                    data.next(Int32(1))
+                    data.next(Int32(1).bigEndian)
                     try windowCoder.encode(w,data:&data)
                 }
                 // TODO: Real Panes
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
index 713b1de6796..28b809e5c53 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/DoFn.swift
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import Logging
+import Foundation
 
 /// A higher level interface to SerializableFn using dependency injected 
dynamic properties in the same
 /// way as we define Composite PTransforms
@@ -29,4 +29,31 @@ public extension DoFn {
     func finishBundle() async throws { }
 }
 
+public struct PInput<Of> {
+    public let value: Of
+    public let timestamp: Date
+    public let window: Window
+    
+    public init(_ value: Of,_ timestamp: Date,_ window: Window) {
+        self.value = value
+        self.timestamp = timestamp
+        self.window = window
+    }
+    
+    public init(_ element: (Of,Date,Window)) {
+        self.value = element.0
+        self.timestamp = element.1
+        self.window = element.2
+    }
+    
+}
 
+public struct POutput<Of> {
+    let stream: PCollectionStream<Of>
+    let timestamp: Date
+    let window: Window
+    
+    func emit(_ value: Of,timestamp: Date? = nil,window: Window? = nil) {
+        stream.emit(value,timestamp: timestamp ?? self.timestamp,window: 
window ?? self.window)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index 825680cfa4c..34e404eca83 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -211,7 +211,6 @@ public final class Pipeline {
                                         $0.spec = .with {
                                             $0.urn = .transformUrn("impulse")
                                         }
-                                        $0.environmentID = 
defaultEnvironment.name
                                     }
                             }
                             rootIds.append(p.name)
@@ -232,7 +231,6 @@ public final class Pipeline {
                                         $0.spec = .with {
                                             $0.urn = 
.transformUrn("group_by_key")
                                         }
-                                        $0.environmentID = 
defaultEnvironment.name
                                     }
                             }
                             rootIds.append(p.name)
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
index 978898fb51f..6487063e0e3 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift
@@ -68,6 +68,11 @@ extension Data {
         return result
     }
     
+    mutating func instant() throws -> Date {
+        let millis = Int64(bigEndian:try next(Int64.self)) &+ 
Int64(-9223372036854775808)
+        return Date(millisecondsSince1970: millis)
+    }
+    
     /// Read a length prefixed chunk of data
     mutating func subdata() throws -> Data {
         let length = try self.varint()
@@ -80,7 +85,7 @@ extension Data {
     mutating func next<T>(_ type: T.Type) throws -> T {
         let size = MemoryLayout<T>.size
         let result = self.withUnsafeBytes {
-            $0.baseAddress!.withMemoryRebound(to: T.self, capacity: 1) { 
$0.pointee }
+            $0.load(as: T.self)
         }
         self = self.safeAdvance(by: size)
         return result
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
index 86007b21104..1a947162251 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift
@@ -27,6 +27,12 @@ extension Data {
         self.append(UInt8(current))
     }
     
+    mutating func instant(_ value: Date) {
+        Swift.withUnsafeBytes(of:(Int64(value.millisecondsSince1970) &- 
Int64(-9223372036854775808)).bigEndian) {
+            self.append(contentsOf: $0)
+        }
+    }
+    
     mutating func next<T>(_ value: T) {
         Swift.withUnsafeBytes(of:value) {
             self.append(contentsOf: $0)
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
index 16eb658d050..9c3424584ef 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift
@@ -22,6 +22,6 @@ extension Date {
         Int64((self.timeIntervalSince1970 * 1000.0).rounded())
     }
     init(millisecondsSince1970: Int64) {
-        self = Date(timeIntervalSince1970: TimeInterval(millisecondsSince1970) 
/ 1000)
+        self = Date(timeIntervalSince1970: Double(millisecondsSince1970) / 
1000.0)
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index 4e977d6f112..1e46b3e723d 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -37,7 +37,7 @@ struct BundleProcessor {
          
descriptor:Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
          collections: [String:AnyPCollection],
          fns: [String:SerializableFn]) throws {
-        self.log = Logging.Logger(label: "BundleProcessor(\(descriptor.id))")
+        self.log = Logging.Logger(label: "BundleProcessor(\(id) 
\(descriptor.id))")
         
         var temp: [Step] = []
         var coders =  BundleCoderContainer(bundle:descriptor)
@@ -60,7 +60,7 @@ struct BundleProcessor {
         
         
         
-        for (_,transform) in descriptor.transforms {
+        for (transformId,transform) in descriptor.transforms {
             let urn = transform.spec.urn
             //Map the input and output streams in the correct order
             let inputs = transform.inputs.sorted().map { streams[$0.1]! }
@@ -69,8 +69,9 @@ struct BundleProcessor {
             if urn == "beam:runner:source:v1" {
                 let remotePort = try RemoteGrpcPort(serializedData: 
transform.spec.payload)
                 let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                log.info("Source '\(transformId)','\(transform.uniqueName)' 
\(remotePort) \(coder)")
                 temp.append(Step(
-                    transformId: transform.uniqueName,
+                    transformId: transform.uniqueName == "" ? transformId : 
transform.uniqueName,
                     fn:Source(client: try .client(for: 
ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), 
coder: coder),
                     inputs:inputs,
                     outputs:outputs,
@@ -79,8 +80,9 @@ struct BundleProcessor {
             } else if urn == "beam:runner:sink:v1" {
                 let remotePort = try RemoteGrpcPort(serializedData: 
transform.spec.payload)
                 let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                log.info("Sink '\(transformId)','\(transform.uniqueName)' 
\(remotePort) \(coder)")
                 temp.append(Step(
-                    transformId: transform.uniqueName,
+                    transformId: transform.uniqueName == "" ? transformId : 
transform.uniqueName,
                     fn:Sink(client: try .client(for: 
ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), 
coder: coder),
                     inputs:inputs,
                     outputs:outputs,
@@ -108,16 +110,23 @@ struct BundleProcessor {
     
     public func process(instruction: String,responder: 
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
 async {
         _ = await withThrowingTaskGroup(of: (String,String).self) { group in
+            log.info("Starting bundle processing for \(instruction)")
+            var count: Int = 0
             do {
                 for step in self.steps {
+                    log.info("Starting Task \(step.transformId)")
                     let context = SerializableFnBundleContext(instruction: 
instruction, transform: step.transformId, payload: step.payload, log: self.log)
                     group.addTask {
                         return try await step.fn.process(context: context, 
inputs: step.inputs, outputs: step.outputs)
                     }
+                    count += 1
                 }
+                var finished: Int = 0
                 for try await (instruction,transform) in group {
-                    log.info("Task Completed (\(instruction),\(transform))")
+                    finished += 1
+                    log.info("Task Completed (\(instruction),\(transform)) 
\(finished) of \(count)")
                 }
+                log.info("All tasks completed for \(instruction)")
                 responder.yield(.with {
                     $0.instructionID = instruction
                 })
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
index 19ad99c7d99..620a6758261 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
@@ -39,6 +39,7 @@ final class Sink : SerializableFn {
         }
         emitter.yield(.last(context.instruction, context.transform))
         emitter.finish()
+        await client.finalizeStream(instruction: context.instruction, 
transform: context.transform)
         return (context.instruction,context.transform)
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
index fd2888ad027..90d28057f48 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
@@ -17,25 +17,32 @@
  */
 
 import Foundation
+import Logging
 
 /// Custom SerializableFn that reads/writes from an external data stream using 
a defined coder. It assumes that a given
 /// data element might contain more than one coder
 final class Source : SerializableFn {
-
+    
     let client: DataplaneClient
     let coder: Coder
+    let log: Logger
     
     public init(client: DataplaneClient,coder:Coder) {
         self.client = client
         self.coder = coder
-
+        self.log = Logger(label:"Source")
     }
     
     
     func process(context: SerializableFnBundleContext,
                  inputs: [AnyPCollectionStream], outputs: 
[AnyPCollectionStream]) async throws -> (String, String) {
+        log.info("Waiting for input on 
\(context.instruction)-\(context.transform)")
         let (stream,_) = await client.makeStream(instruction: 
context.instruction, transform: context.transform)
+        
+        var messages: Int = 0
+        var count: Int = 0
         for await message in stream {
+            messages += 1
             switch message {
             case let .data(data):
                 var d = data
@@ -43,6 +50,7 @@ final class Source : SerializableFn {
                     let value = try coder.decode(&d)
                     for output in outputs {
                         try output.emit(value: value)
+                        count += 1
                     }
                 }
             case let .last(id, transform):
@@ -50,9 +58,11 @@ final class Source : SerializableFn {
                     output.finish()
                 }
                 await client.finalizeStream(instruction: id, transform: 
transform)
+                log.info("Source \(context.instruction),\(context.transform) 
handled \(count) items over \(messages) messages")
                 return (id,transform)
             //TODO: Handle timer messages
             default:
+                log.info("Unhanled message \(message)")
                 break
             }
         }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
index da820e0675b..254842e2d9d 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
@@ -67,7 +67,7 @@ actor DataplaneClient {
     private var streams: 
[Pair:(Stream,Stream.Continuation,MultiplexContinuation)] = [:]
     private let flush: Int
 
-    public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=100) throws {
+    public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=1000) throws 
{
         self.id  = id
         self.log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))")
         self.multiplex = AsyncStream.makeStream(of:Multiplex.self)
@@ -80,7 +80,9 @@ actor DataplaneClient {
             log.info("Initiating data plane multiplexing.")
             
             let input = multiplex.0
-
+            var count: Int = 0
+            var flushes: Int = 0
+            
             var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
             for try await element in input {
                 var shouldFlush: Bool = false
@@ -92,6 +94,7 @@ actor DataplaneClient {
                         $0.transformID = element.transform
                         $0.data = payload
                     })
+                    count += 1
                 case let .timer(family, payload):
                     elements.timers.append(.with {
                         $0.instructionID = element.id
@@ -99,6 +102,7 @@ actor DataplaneClient {
                         $0.timerFamilyID = family
                         $0.timers = payload
                     })
+                    count += 1
                 case let .last(id, transform):
                     elements.data.append(.with {
                         $0.instructionID = id
@@ -106,19 +110,36 @@ actor DataplaneClient {
                         $0.isLast = true
                     })
                     shouldFlush = true
+                    count += 1
                 case .flush:
                     shouldFlush = true
                 }
                 if shouldFlush || elements.data.count + elements.timers.count 
>= flush {
                     do {
+                        if case .last = element.message {
+                            log.info("Got last message, flushing 
\(elements.data.count + elements.timers.count) elements to data plane")
+                        }
                         try await stream.requestStream.send(elements)
                     } catch {
                         log.error("Unable to multiplex elements onto data 
plane: \(error)")
                     }
                     elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
                     shouldFlush = false
+                    flushes += 1
+                }
+                if count % 50000 == 0 && count > 0 {
+                    log.info("Processed \(count) elements (\(flushes) 
flushes)")
                 }
             }
+            if(elements.data.count + elements.timers.count > 0) {
+                do {
+                    log.info("Flushing final elements to data plane.")
+                    try await stream.requestStream.send(elements)
+                } catch {
+                    log.error("Unable to multiplex final elements onto data 
plane: \(error)")
+                }
+            }
+            log.info("Shutting down dataplane multiplexing")
         }
 
         // Demux task
@@ -132,7 +153,6 @@ actor DataplaneClient {
                     
                     for element in elements.data {
                         let key = Pair(id: element.instructionID, transform: 
element.transformID)
-                        //Drop zero-length elements
                         if element.data.count > 0 {
                             
messages[key,default:[]].append(.data(element.data))
                         }
@@ -159,15 +179,20 @@ actor DataplaneClient {
                         }
                     }
                     // Send any last messages
+                    // TODO: Fix known race here. We try to re-use streams 
across bundles which can lead to a race where yield is sent too early.
                     for (key,value) in last {
                         let output = await self.makeStream(key: key).1
                         output.yield(value)
                     }
                 }
             } catch {
-                log.error("Lost data plane connection.")
+                log.error("Lost data plane connection. Closing all outstanding 
streams")
+                for (id,stream) in await streams {
+                    log.info("Closing stream \(id)")
+                    stream.1.finish()
+                }
             }
-            
+            multiplex.1.finish()
         }
     }
     
@@ -192,7 +217,11 @@ actor DataplaneClient {
     }
     
     func finalizeStream(instruction:String,transform:String) {
-        //TODO: Implement finalization.
+        let key = Pair(id:instruction,transform:transform)
+        log.info("Done with stream \(key)")
+        if let element = streams.removeValue(forKey: key) {
+            element.1.finish()
+        }
     }
 
     
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index 08204a11184..1a19f930f2c 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -52,6 +52,7 @@ actor Worker {
         //Start the response task. This will continue until a yield call is 
sent from responder
         Task {
             for await r in responses {
+                log.info("Sending response \(r)")
                 try await control.requestStream.send(r)
             }
         }
@@ -75,8 +76,14 @@ actor Worker {
             for try await instruction in control.responseStream {
                 switch instruction.request {
                 case .processBundle(let pbr):
-                    try await processor(for:pbr.processBundleDescriptorID)
-                        .process(instruction: 
instruction.instructionID,responder:responder)
+                    do {
+                        let p = try await 
processor(for:pbr.processBundleDescriptorID)
+                        Task {
+                            await p.process(instruction: 
instruction.instructionID,responder:responder)
+                        }
+                    } catch {
+                        log.error("Unable to process bundle 
\(pbr.processBundleDescriptorID): \(error)")
+                    }
                     break
                 default:
                     log.warning("Ignoring instruction 
\(instruction.instructionID). Not yet implemented.")
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
index e29ea085b7d..7febd84895a 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/BuiltIn+Elements.swift
@@ -18,87 +18,34 @@
 
 import Foundation
 
-public protocol PInput<Of> {
-    associatedtype Of
-    
-    var value: Of { get }
-    var timestamp: Date { get }
-    var window: Window { get }
-}
-
-public protocol POutput<Of> {
-    associatedtype Of
-    
-    func emit(_ value: Of,timestamp: Date,window: Window)
-    func emit(_ value: Of)
-    func emit(_ value: Of,timestamp: Date)
-    func emit(_ value: Of,window: Window)
-}
-
-
-struct PardoInput<Of> : PInput {
-    let value: Of
-    let timestamp: Date
-    let window: Window
-    public init(_ value:(Of,Date,Window)) {
-        self.value = value.0
-        self.timestamp = value.1
-        self.window = value.2
-    }
-}
-
-struct PardoOutput<Of> : POutput {
-    
-    
-    let stream: PCollectionStream<Of>
-    let timestamp: Date
-    let window: Window
-
-    func emit(_ value: Of, timestamp: Date, window: Window) {
-        stream.emit(value,timestamp:timestamp,window:window)
-    }
-    func emit(_ value: Of, timestamp: Date) {
-        stream.emit(value,timestamp:timestamp,window:window)
-
-    }    
-    func emit(_ value: Of, window: Window) {
-        stream.emit(value,timestamp:timestamp,window:window)
-
-    }
-    func emit(_ value: Of) {
-        stream.emit(value,timestamp:timestamp,window:window)
-    }
-
-    
-}
-
 public extension PCollection {
     
     // No Output
-    func pardo(name:String,_ fn: @Sendable @escaping (any PInput<Of>) async 
throws -> Void) {
+    func pardo(name:String,_ fn: @Sendable @escaping (PInput<Of>) async throws 
-> Void) {
         pstream(name:name) { input in
             for try await element in input {
-                try await fn(PardoInput(element))
+                try await fn(PInput(element))
             }
         }
     }
     
     // One Output
-    func pardo<O0>(name:String,_ fn: @Sendable @escaping (any PInput<Of>,any 
POutput<O0>) async throws -> Void) -> PCollection<O0> {
+    func pardo<O0>(name:String,_ fn: @Sendable @escaping 
(PInput<Of>,POutput<O0>) async throws -> Void) -> PCollection<O0> {
         pstream(name:name) { input,output in
             for try await element in input {
-                try await 
fn(PardoInput(element),PardoOutput(stream:output,timestamp:element.1,window:element.2))
+                try await fn(PInput(element),
+                             
POutput(stream:output,timestamp:element.1,window:element.2))
             }
         }
     }
     
     //Two Outputs
-    func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping (any 
PInput<Of>,any POutput<O0>,any POutput<O1>) async throws -> Void) -> 
(PCollection<O0>,PCollection<O1>) {
+    func pardo<O0,O1>(name:String,_ fn: @Sendable @escaping 
(PInput<Of>,POutput<O0>,POutput<O1>) async throws -> Void) -> 
(PCollection<O0>,PCollection<O1>) {
         pstream(name:name) { input,o0,o1 in
             for try await element in input {
-                try await fn(PardoInput(element),
-                             
PardoOutput(stream:o0,timestamp:element.1,window:element.2),
-                             
PardoOutput(stream:o1,timestamp:element.1,window:element.2)
+                try await fn(PInput(element),
+                             
POutput(stream:o0,timestamp:element.1,window:element.2),
+                             
POutput(stream:o1,timestamp:element.1,window:element.2)
                              
                 )
             }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
index 7abd847b8b5..a03f3e6ec11 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Combining.swift
@@ -25,7 +25,8 @@ public extension PCollection {
                 for v in kv.values {
                     accumulator(v,&result)
                 }
-                output.emit(KV(kv.key,result),timestamp:ts,window:w)
+                let intermediate = KV(kv.key,result)
+                output.emit(intermediate,timestamp:ts,window:w)
             }
                 
         }
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
index b4ffa125b08..142f264bd99 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -30,8 +30,8 @@ final class FileIOTests: XCTestCase {
     }
 
     func testGoogleStorageReadFiles() async throws {
+        throw XCTSkip()
         try await 
PCollectionTest(PCollection<KV<String,String>>().readFiles(in: 
GoogleStorage.self)) { log,inputs,outputs in
-            throw XCTSkip()
             log.info("Sending value")
             try 
inputs[0].emit(value:KV("dataflow-samples","shakespeare/asyoulikeit.txt"))
             log.info("Value sent")
@@ -43,11 +43,13 @@ final class FileIOTests: XCTestCase {
     }
 
     func testShakespeareWordcount() async throws {
+        //throw XCTSkip()
         try await Pipeline { pipeline in
             let contents = pipeline
                 .create(["dataflow-samples/shakespeare"])
                 .map({ value in
                     let parts = value.split(separator: "/",maxSplits: 1)
+                    print("Got filename \(parts) from \(value)")
                     return KV(parts[0].lowercased(),parts[1].lowercased())
                 })
                 .listFiles(in: GoogleStorage.self)
@@ -64,10 +66,9 @@ final class FileIOTests: XCTestCase {
             
             // Our first group by operation
             let baseCount = lines
-                .flatMap({ $0.components(separatedBy: .whitespaces) })
+                .flatMap({ (line:String) in line.components(separatedBy: 
.whitespaces) })
                 .groupBy({ ($0,1) })
                 .sum()
-                .log(prefix:"INTERMEDIATE OUTPUT")
             
             let normalizedCounts = baseCount.groupBy {
                 ($0.key.lowercased().trimmingCharacters(in: 
.punctuationCharacters),
@@ -76,7 +77,7 @@ final class FileIOTests: XCTestCase {
             
             normalizedCounts.log(prefix:"COUNT OUTPUT")
             
-        }.run(PortableRunner(loopback:true))
+        }.run(PortableRunner(port:8099,loopback:true))
     }
     
 
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 27e5972c1e7..0ea5d962d92 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -83,7 +83,7 @@ final class IntegrationTests: XCTestCase {
             normalizedCounts.log(prefix:"COUNT OUTPUT")
             errors.log(prefix:"ERROR OUTPUT")
             
-        }.run(PortableRunner(loopback:true)) 
+        }.run(PortableRunner(port:8099,loopback:true)) 
         
         
     }

Reply via email to