This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch swift-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f3f519d11d93923bf5b09aeb55ce1e1fca964d34
Author: Byron Ellis <[email protected]>
AuthorDate: Wed Aug 16 18:22:27 2023 -0700

    Add the rest of the code. Basic word count integration test now completes 
successfully with the Python Portable Runner.
---
 .../ApacheBeam/Client/ApiServiceDescriptor.swift   |  38 +++-
 .../Sources/ApacheBeam/Coders/Coder+Decoding.swift |   2 +-
 sdks/swift/Sources/ApacheBeam/Coders/Coder.swift   |  16 +-
 .../Sources/ApacheBeam/Core/ArtifactInfo.swift     |   8 +-
 .../Sources/ApacheBeam/Core/Environment.swift      |  50 ++++++
 .../ApacheBeam/Core/Fn/SerializableFn.swift        |   2 +-
 sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift  |   2 +-
 .../Core/PCollection/AnyPCollectionStream.swift    |   4 +
 .../Core/PCollection/PCollectionStream.swift       |  22 +--
 .../ApacheBeam/Core/Pipeline/Pipeline.swift        |  18 +-
 .../ApacheBeam/Core/Pipeline/PipelineContext.swift |   6 +-
 .../ApacheBeam/Core/Pipeline/PipelineRunner.swift  |   3 +
 .../ApacheBeam/Internal/Dictionary+Helpers.swift   |   6 +
 .../Internal/PipelineProto+Environment.swift       |  32 +---
 .../ApacheBeam/Internal/ProtoConversion.swift      |   2 +-
 .../ApacheBeam/Runners/PortableRunner.swift        |  59 +++++++
 .../Runtime/Bundle/BundleProcessor.swift           | 113 ++++++++++++
 .../Sources/ApacheBeam/Runtime/Bundle/Sink.swift   |  26 +++
 .../Sources/ApacheBeam/Runtime/Bundle/Source.swift |  43 +++++
 .../ApacheBeam/Runtime/DataplaneClient.swift       | 192 +++++++++++++++++++++
 .../Sources/ApacheBeam/Runtime/Worker/Worker.swift |  72 +++++++-
 .../ApacheBeam/Runtime/Worker/WorkerProvider.swift |  37 +++-
 .../Sources/ApacheBeam/Transforms/Basic.swift      |   3 +
 .../ApacheBeamTests/Pipeline/Fixtures/file1.txt    |   4 +
 .../ApacheBeamTests/Pipeline/Fixtures/file2.txt    |   3 +
 .../Pipeline/IntegrationTests.swift                |  11 +-
 26 files changed, 684 insertions(+), 90 deletions(-)

diff --git a/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift 
b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift
index 40e1b662d8f..18d95403ebc 100644
--- a/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift
+++ b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift
@@ -16,6 +16,10 @@
  * limitations under the License.
  */
 
+import GRPC
+import NIOCore
+
+
 /// Representation of the API Service Descriptors used to communicate with 
runners (and vice versa)
 public struct ApiServiceDescriptor {
     
@@ -41,16 +45,15 @@ extension ApiServiceDescriptor {
 
 extension ApiServiceDescriptor : ProtoConversion {
     
-    typealias Proto = Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor
-    var proto: Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor {
-        .with { $0.url = self.url }
+    func populate(_ proto: inout 
Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor) throws {
+        proto.url = self.url
     }
+}
 
+extension ApiServiceDescriptor : Hashable {
     
 }
 
-
-
 public extension ApiServiceDescriptor {
     static func from(env:String,format:EncodedAs = .textproto) throws -> 
ApiServiceDescriptor {
         switch format {
@@ -61,3 +64,28 @@ public extension ApiServiceDescriptor {
         }
     }
 }
+
+public extension GRPCChannelPool {
+    static func with(endpoint:ApiServiceDescriptor, eventLoopGroup: 
EventLoopGroup) throws -> GRPCChannel {
+        let url = endpoint.url
+        //TODO: Transport Security configuration
+        if(url.starts(with: "unix://")) {
+            return try GRPCChannelPool.with(target: 
.unixDomainSocket(url.replacing("unix://",with:"")),
+                                        transportSecurity: .plaintext,
+                                        eventLoopGroup: eventLoopGroup)
+        } else {
+            if let lastNdx = url.lastIndex(of: ":") {
+                guard lastNdx.utf16Offset(in: url) > 0 else {
+                    throw ApacheBeamError.runtimeError("Service URL must be of 
the form host:port")
+                }
+                let host = String(url.prefix(upTo: lastNdx))
+                let port = Int(url.suffix(from:url.index(lastNdx,offsetBy:1)))!
+                return try GRPCChannelPool.with(target: .host(host, port: 
port),
+                                                transportSecurity: .plaintext,
+                                                eventLoopGroup: eventLoopGroup)
+            } else {
+                throw ApacheBeamError.runtimeError("Service URL must be of the 
form host:port")
+            }
+        }
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
index 31b97f0d318..9eef4113576 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift
@@ -49,7 +49,7 @@ public extension Coder {
         case let .keyvalue(keyCoder, valueCoder):
             return .kv(try keyCoder.decode(&data), try 
valueCoder.decode(&data))
         case let .iterable(coder):
-            let length = try data.next(Int32.self)
+            let length = try data.next(Int32.self).byteSwapped
             return .array(try (0..<length).map({ _ in try coder.decode(&data) 
}))
         case let .windowedvalue(valueCoder, windowCoder):
             let timestamp = try data.next(Int64.self).byteSwapped &+ 
Int64(-9223372036854775808)
diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift 
b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
index ceb2649243e..bf1ead4f6d8 100644
--- a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
+++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift
@@ -155,7 +155,7 @@ struct BundleCoderContainer : CoderContainer {
 }
 
 extension Coder {
-    static func of(name:String,in container:CoderContainer) -> Coder? {
+    static func of(name:String,in container:CoderContainer) throws -> Coder {
         if let baseCoder = container[name] {
             switch baseCoder.spec.urn {
             case "beam:coder:bytes:v1":
@@ -167,24 +167,24 @@ extension Coder {
             case "beam:coder:double:v1":
                 return .double
             case "beam:coder:iterable:v1":
-                return .iterable(.of(name: baseCoder.componentCoderIds[0], in: 
container)!)
+                return .iterable(try .of(name: baseCoder.componentCoderIds[0], 
in: container))
             case "beam:coder:kv:v1":
                 return .keyvalue(
-                    .of(name: baseCoder.componentCoderIds[0],in:container)!,
-                    .of(name: baseCoder.componentCoderIds[1], in:container)!)
+                    try .of(name: baseCoder.componentCoderIds[0],in:container),
+                    try .of(name: baseCoder.componentCoderIds[1], 
in:container))
             case "beam:coder:global_window:v1":
                 return .globalwindow
             case "beam:coder:windowed_value:v1":
                 return .windowedvalue(
-                    .of(name: baseCoder.componentCoderIds[0],in:container)!,
-                    .of(name: baseCoder.componentCoderIds[1],in:container)!)
+                    try .of(name: baseCoder.componentCoderIds[0],in:container),
+                    try .of(name: baseCoder.componentCoderIds[1],in:container))
             case "beam:coder:length_prefix:v1":
-                return .lengthprefix(.of(name: 
baseCoder.componentCoderIds[0],in:container)!)
+                return .lengthprefix(try .of(name: 
baseCoder.componentCoderIds[0],in:container))
             default:
                 return .unknown(baseCoder.spec.urn)
             }
         } else {
-            return nil
+            throw ApacheBeamError.runtimeError("Unable to location coder 
\(name) in container.")
         }
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift 
b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift
index 73c6112d680..ac5d26695b5 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift
@@ -22,10 +22,8 @@ public struct ArtifactInfo {
 }
 
 extension ArtifactInfo : ProtoConversion {
-    var proto: Org_Apache_Beam_Model_Pipeline_V1_ArtifactInformation {
-        .with {
-            $0.roleUrn = .beamUrn(role,type:"artifact:type")
-            $0.typeUrn = .beamUrn(type,type:"artifact:role")
-        }
+    func populate(_ proto: inout 
Org_Apache_Beam_Model_Pipeline_V1_ArtifactInformation) throws {
+        proto.roleUrn = .beamUrn(role,type:"artifact:type")
+        proto.typeUrn = .beamUrn(type,type:"artifact:role")
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
index fd09bf454e2..fe5f19d6fa6 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift
@@ -42,3 +42,53 @@ public struct Environment {
     }
 }
 
+extension Environment : ProtoConversion {
+    
+    func populate(_ proto: inout EnvironmentProto) throws {
+        proto.urn = switch category {
+        case .docker(_): .beamUrn("docker",type:"env")
+        case .system: .beamUrn("default",type:"env")
+        case .process(_, _, _, _): .beamUrn("process",type:"env")
+        case .external(_): .beamUrn("external",type:"env")
+        }
+        proto.capabilities = capabilities
+        
+        proto.dependencies = try dependencies.map({ artifact in
+            try .with {
+                try artifact.populate(&$0)
+            }
+        })
+            
+        if case let .docker(containerImage) = category {
+            proto.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with {
+                $0.containerImage = containerImage
+            }.serializedData()
+        }
+        
+        if case let .external(endpoint) = category {
+            proto.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with {
+                $0.endpoint = try .with {
+                    try endpoint.populate(&$0)
+                }
+            }.serializedData()
+        }
+        
+        if case let .process(command,arch,os,env) = category {
+            proto.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with {
+                $0.arch = arch
+                $0.command = command
+                $0.os = os
+                $0.env = env
+            }.serializedData()
+        }
+    }
+    
+    public static func docker(_ imageName: 
String,capabilities:[String]=[],dependencies:[ArtifactInfo]=[]) -> Environment {
+        
Environment(.docker(imageName),capabilities:capabilities,dependencies:dependencies)
+    }
+    
+    public static func external(_ endpoint: 
ApiServiceDescriptor,capabilities:[String]=[],dependencies:[ArtifactInfo]=[]) 
-> Environment {
+        
Environment(.external(endpoint),capabilities:capabilities,dependencies:dependencies)
+    }
+
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index 717b604deb1..80d23bbcc09 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -22,7 +22,7 @@ public struct SerializableFnBundleContext {
     let instruction:String
     let transform:String
     let payload:Data
-    let log:Logger
+    let log:Logging.Logger
 }
 
 /// SerialiableFn is a protocol for functions that should be parameterized for 
the pipeline. This is intended as a fairly low level class and users
diff --git a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift 
b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
index 278ccb609cb..237abc8c645 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift
@@ -35,7 +35,7 @@ public struct KV<Key,Value> : AnyKeyValue {
         self.values = [value]
     }
     
-    public init(_ key: Key,_ values: [Value]) {
+    public init(_ key: Key,values: [Value]) {
         self.key = key
         self.values = values
     }
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
index df11ecd3780..4e5ea0bfb0c 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/AnyPCollectionStream.swift
@@ -78,6 +78,10 @@ public struct AnyPCollectionStream : AsyncSequence {
         return value as! PCollectionStream<Out>
     }
     
+    public func emit(value element: Any) throws {
+        try emitClosure(value,element)
+    }
+    
     public func finish() {
         finishClosure(self.value)
     }
diff --git 
a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift 
b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index 1d2b3cbe151..38b8f795ac1 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -52,7 +52,7 @@ public final class PCollectionStream<Of> : AsyncSequence {
     // a more clever way of doing this, but I don't know what it is.
     
     func emit<K,V>(key: K,value: [V],timestamp: Date,window:Window) {
-        emit(KV(key,value) as! Of,timestamp:timestamp,window:window)
+        emit(KV(key,values:value) as! Of,timestamp:timestamp,window:window)
     }
     
     func emit<K>(key: K,value: BeamValue,timestamp: Date,window:Window) throws 
{
@@ -63,11 +63,11 @@ public final class PCollectionStream<Of> : AsyncSequence {
         // won't be much in the way of new scalar values.
         if case .array(let array) = value {
             switch array.first {
-            case .boolean(_):emit(key:key,value:array.map({$0.baseValue as! 
Bool}),timestamp:timestamp,window:window)
-            case .bytes(_):  emit(key:key,value:array.map({$0.baseValue as! 
Data}),timestamp:timestamp,window:window)
-            case .double(_): emit(key:key,value:array.map({$0.baseValue as! 
Double}),timestamp:timestamp,window:window)
-            case .integer(_):emit(key:key,value:array.map({$0.baseValue as! 
Int}),timestamp:timestamp,window:window)
-            case .string(_): emit(key:key,value:array.map({$0.baseValue as! 
String}),timestamp:timestamp,window:window)
+            case .boolean(_):emit(key:key,value:array.map({$0.baseValue! as! 
Bool}),timestamp:timestamp,window:window)
+            case .bytes(_):  emit(key:key,value:array.map({$0.baseValue! as! 
Data}),timestamp:timestamp,window:window)
+            case .double(_): emit(key:key,value:array.map({$0.baseValue! as! 
Double}),timestamp:timestamp,window:window)
+            case .integer(_):emit(key:key,value:array.map({$0.baseValue! as! 
Int}),timestamp:timestamp,window:window)
+            case .string(_): emit(key:key,value:array.map({$0.baseValue! as! 
String}),timestamp:timestamp,window:window)
             default:
                 throw ApacheBeamError.runtimeError("Can't use 
\(String(describing:array.first)) as a value in a key value pair")
             }
@@ -90,11 +90,11 @@ public final class PCollectionStream<Of> : AsyncSequence {
         if case let .kv(key,value) = value {
             // Unwrap the key first
             switch key {
-            case let .boolean(v):try 
emit(key:v,value:value,timestamp:timestamp,window:window)
-            case let .bytes(v):  try 
emit(key:v,value:value,timestamp:timestamp,window:window)
-            case let .double(v): try 
emit(key:v,value:value,timestamp:timestamp,window:window)
-            case let .integer(v):try 
emit(key:v,value:value,timestamp:timestamp,window:window)
-            case let .string(v): try 
emit(key:v,value:value,timestamp:timestamp,window:window)
+            case let .boolean(v):try 
emit(key:v!,value:value,timestamp:timestamp,window:window)
+            case let .bytes(v):  try 
emit(key:v!,value:value,timestamp:timestamp,window:window)
+            case let .double(v): try 
emit(key:v!,value:value,timestamp:timestamp,window:window)
+            case let .integer(v):try 
emit(key:v!,value:value,timestamp:timestamp,window:window)
+            case let .string(v): try 
emit(key:v!,value:value,timestamp:timestamp,window:window)
             default:
                 throw ApacheBeamError.runtimeError("Can't use \(value) as a 
value in a key value pair")
             }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
index ece88ed6799..116da114b67 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/Pipeline.swift
@@ -10,15 +10,8 @@ public final class Pipeline {
         self.content = content
     }
     
-    public func run(loopback:Bool = false) throws {
-        let job = try context
-        log.info("\(job.proto)")
-        
-        var proto = job.proto
-        if loopback {
-            //If we are in loopback mode we want to replace the default 
environment
-        }
-        
+    public func run(_ runner: PipelineRunner) async throws {
+        try await runner.run(try self.context)        
     }
     
     
@@ -127,6 +120,7 @@ public final class Pipeline {
                             .with {
                                 $0.uniqueName = uniqueName("c")
                                 $0.coderID = coder.name
+                                $0.windowingStrategyID = defaultStrategy.name
                                 $0.isBounded = .bounded //TODO: Get this from 
the coder
                             }
                     }
@@ -248,12 +242,10 @@ public final class Pipeline {
                         }
                     }
                 }
-
-                
-                
+                proto.rootTransformIds = rootIds
                 
             }
-            return PipelineContext(pipeline,defaultEnvironment.name)
+            return 
PipelineContext(pipeline,defaultEnvironment.name,collections,fns)
         }
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
index d944b86a7d6..a173d1850ff 100644
--- a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineContext.swift
@@ -2,10 +2,14 @@ public final class PipelineContext {
 
     var proto : PipelineProto
     let defaultEnvironmentId: String
+    let collections: [String:AnyPCollection]
+    let pardoFns: [String:SerializableFn]
 
-    init(_ proto:PipelineProto,_ defaultEnvironmentId: String) {
+    init(_ proto:PipelineProto,_ defaultEnvironmentId: String,_ collections: 
[String:AnyPCollection],_ fns:[String:SerializableFn]) {
         self.proto = proto
         self.defaultEnvironmentId = defaultEnvironmentId
+        self.collections = collections
+        self.pardoFns = fns
     }
     
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift 
b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift
new file mode 100644
index 00000000000..4e5e4f55d73
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Core/Pipeline/PipelineRunner.swift
@@ -0,0 +1,3 @@
+public protocol PipelineRunner {
+    func run(_ context: PipelineContext) async throws 
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift
new file mode 100644
index 00000000000..a129508e85f
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Internal/Dictionary+Helpers.swift
@@ -0,0 +1,6 @@
+public extension Dictionary where Key:Comparable {
+    /// Return key-value pairs sorted by key. 
+    func sorted() -> [(Key,Value)] {
+        self.map({ ($0,$1)}).sorted(by: { $0.0 < $1.0 })
+    }
+}
diff --git 
a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift
index 5207748237f..8e9d85d2a7a 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/PipelineProto+Environment.swift
@@ -2,36 +2,8 @@ extension PipelineProto {
     
     mutating func environment(from: Environment) throws -> PipelineComponent {
         return try environment { _ in
-            try .with { p in
-                p.urn = switch from.category {
-                case .docker(_): .beamUrn("docker",type:"env")
-                case .system: .beamUrn("default",type:"env")
-                case .process(_, _, _, _): .beamUrn("process",type:"env")
-                case .external(_): .beamUrn("external",type:"env")
-                }
-                p.capabilities = from.capabilities
-                p.dependencies = from.dependencies.map({ $0.proto })
-                
-                if case let .docker(containerImage) = from.category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_DockerPayload.with {
-                        $0.containerImage = containerImage
-                    }.serializedData()
-                }
-                
-                if case let .external(endpoint) = from.category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ExternalPayload.with {
-                        $0.endpoint = endpoint.proto
-                    }.serializedData()
-                }
-                
-                if case let .process(command,arch,os,env) = from.category {
-                    p.payload = try 
Org_Apache_Beam_Model_Pipeline_V1_ProcessPayload.with {
-                        $0.arch = arch
-                        $0.command = command
-                        $0.os = os
-                        $0.env = env
-                    }.serializedData()
-                }
+            try .with {
+                try from.populate(&$0)
             }
         }
     }
diff --git a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift 
b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
index 2ab1cf85c66..d3342b3cffb 100644
--- a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
+++ b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift
@@ -19,7 +19,7 @@
 protocol ProtoConversion {
     associatedtype Proto
     
-    var proto: Proto { get throws }
+    func populate(_ proto: inout Proto) throws
 }
 
 
diff --git a/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift 
b/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift
new file mode 100644
index 00000000000..67737e0bf1a
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runners/PortableRunner.swift
@@ -0,0 +1,59 @@
+import Logging
+import GRPC
+import NIOCore
+
+public struct PortableRunner : PipelineRunner {
+    let loopback:Bool
+    let log: Logging.Logger
+    let host: String
+    let port: Int
+    
+    public init(host:String="localhost",port:Int=8073,loopback:Bool=false) {
+        self.loopback = loopback
+        self.log = .init(label: "PortableRunner")
+        self.host = host
+        self.port = port
+    }
+    
+    public func run(_ context: PipelineContext) async throws {
+        var proto = context.proto
+        if loopback {
+            //If we are in loopback mode we want to replace the default 
environment
+            //with an external environment that points to our local worker 
server
+            let worker = try WorkerServer(context.collections,context.pardoFns)
+            log.info("Running in LOOPBACK mode with a worker server at 
\(worker.endpoint).")
+            proto.components.environments[context.defaultEnvironmentId] = try 
.with {
+                try Environment.external(worker.endpoint).populate(&$0)
+            }
+        }
+        log.info("\(proto)")
+        log.info("Connecting to Portable Runner at \(host):\(port).")
+        let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
+        let channel = try GRPCChannelPool.with(target: .host(host, port: port),
+                                           transportSecurity: .plaintext, 
eventLoopGroup: group)
+        let client = 
Org_Apache_Beam_Model_JobManagement_V1_JobServiceAsyncClient(channel: channel)
+        let prepared = try await client.prepare(.with {
+            $0.pipeline = proto
+        })
+        let job = try await client.run(.with {
+            $0.preparationID = prepared.preparationID
+        })
+        log.info("Submitted job \(job.jobID)")
+        var done = false
+        while !done {
+            let status = try await client.getState(.with {
+                $0.jobID = job.jobID
+            })
+            log.info("Job \(job.jobID) status: \(status.state)")
+            switch status.state {
+            case .stopped,.failed,.done:
+                done = true
+            default:
+                try await Task.sleep(for: .seconds(5))
+            }
+        }
+        log.info("Job completed.")
+        
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
new file mode 100644
index 00000000000..3863b8748f7
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -0,0 +1,113 @@
+import Logging
+
+import Foundation
+
+struct BundleProcessor {
+    let log: Logging.Logger
+    
+    struct Step {
+        let transformId: String
+        let fn: SerializableFn
+        let inputs: [AnyPCollectionStream]
+        let outputs: [AnyPCollectionStream]
+        let payload: Data
+    }
+    
+    let steps:[Step]
+    
+    init(id:String,
+         
descriptor:Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
+         collections: [String:AnyPCollection],
+         fns: [String:SerializableFn]) throws {
+        self.log = Logging.Logger(label: "BundleProcessor(\(descriptor.id))")
+        
+        var temp: [Step] = []
+        var coders =  BundleCoderContainer(bundle:descriptor)
+        
+        var streams: [String:AnyPCollectionStream] = [:]
+        // First make streams for everything in this bundle (maybe I could use 
the pcollection array for this?)
+        for (_,transform) in descriptor.transforms {
+            for id in transform.inputs.values {
+                if streams[id] == nil {
+                    streams[id] = collections[id]!.anyStream
+                }
+            }
+            for id in transform.outputs.values {
+                if streams[id] == nil {
+                    streams[id] = collections[id]!.anyStream
+                }
+            }
+        }
+        
+        
+        
+        for (_,transform) in descriptor.transforms {
+            let urn = transform.spec.urn
+            //Map the input and output streams in the correct order
+            let inputs = transform.inputs.sorted().map { streams[$0.1]! }
+            let outputs = transform.outputs.sorted().map { streams[$0.1]! }
+            if urn == "beam:runner:source:v1" {
+                let remotePort = try RemoteGrpcPort(serializedData: 
transform.spec.payload)
+                let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                temp.append(Step(
+                    transformId: transform.uniqueName,
+                    fn:Source(client: try .client(for: 
ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), 
coder: coder),
+                    inputs:inputs,
+                    outputs:outputs,
+                    payload:Data()
+                ))
+            } else if urn == "beam:runner:sink:v1" {
+                let remotePort = try RemoteGrpcPort(serializedData: 
transform.spec.payload)
+                let coder = try Coder.of(name: remotePort.coderID, in: coders)
+                temp.append(Step(
+                    transformId: transform.uniqueName,
+                    fn:Sink(client: try .client(for: 
ApiServiceDescriptor(proto:remotePort.apiServiceDescriptor), worker: id), 
coder: coder),
+                    inputs:inputs,
+                    outputs:outputs,
+                    payload:Data()
+                ))
+
+            } else if urn == "beam:transform:pardo:v1" {
+                let pardoPayload = try 
Org_Apache_Beam_Model_Pipeline_V1_ParDoPayload(serializedData: 
transform.spec.payload)
+                if let fn = fns[transform.uniqueName] {
+                    temp.append(Step(transformId: transform.uniqueName,
+                                     fn: fn,
+                                     inputs: inputs,
+                                     outputs: outputs,
+                                     payload: pardoPayload.doFn.payload))
+                } else {
+                    log.warning("Unable to map \(transform.uniqueName) to a 
known SerializableFn. Will be skipped during processing.")
+                }
+            } else {
+                log.warning("Unable to map \(urn). Will be skipped during 
processing.")
+            }
+            
+        }
+        self.steps = temp
+    }
+    
+    public func process(instruction: String,responder: 
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
 async {
+        _ = await withThrowingTaskGroup(of: (String,String).self) { group in
+            do {
+                for step in self.steps {
+                    let context = SerializableFnBundleContext(instruction: 
instruction, transform: step.transformId, payload: step.payload, log: self.log)
+                    group.addTask {
+                        return try await step.fn.process(context: context, 
inputs: step.inputs, outputs: step.outputs)
+                    }
+                }
+                for try await (instruction,transform) in group {
+                    log.info("Task Completed (\(instruction),\(transform))")
+                }
+                responder.yield(.with {
+                    $0.instructionID = instruction
+                })
+            } catch {
+                responder.yield(.with {
+                    $0.instructionID = instruction
+                    $0.error = "\(error)"
+                })
+            }
+        }
+    }
+    
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
new file mode 100644
index 00000000000..b399c13f1f6
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
@@ -0,0 +1,26 @@
+import Foundation
+
+final class Sink : SerializableFn {
+    let client: DataplaneClient
+    let coder: Coder
+    
+    public init(client: DataplaneClient,coder:Coder) {
+        self.client = client
+        self.coder = coder
+
+    }
+    
+    
+    func process(context: SerializableFnBundleContext,
+                 inputs: [AnyPCollectionStream], outputs: 
[AnyPCollectionStream]) async throws -> (String, String) {
+        let (_,emitter) = await client.makeStream(instruction: 
context.instruction, transform: context.transform)
+        for try await element in inputs[0] {
+            var output = Data()
+            try coder.encode(element, data: &output)
+            emitter.yield(.data(output))
+        }
+        emitter.yield(.last(context.instruction, context.transform))
+        emitter.finish()
+        return (context.instruction,context.transform)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
new file mode 100644
index 00000000000..b53ed8794fa
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Bundle/Source.swift
@@ -0,0 +1,43 @@
+import Foundation
+
+/// Custom SerializableFn that reads/writes from an external data stream using 
a defined coder. It assumes that a given
+/// data element might contain more than one coder
+final class Source : SerializableFn {
+
+    let client: DataplaneClient
+    let coder: Coder
+    
+    public init(client: DataplaneClient,coder:Coder) {
+        self.client = client
+        self.coder = coder
+
+    }
+    
+    
+    func process(context: SerializableFnBundleContext,
+                 inputs: [AnyPCollectionStream], outputs: 
[AnyPCollectionStream]) async throws -> (String, String) {
+        let (stream,_) = await client.makeStream(instruction: 
context.instruction, transform: context.transform)
+        for await message in stream {
+            switch message {
+            case let .data(data):
+                var d = data
+                while d.count > 0 {
+                    let value = try coder.decode(&d)
+                    for output in outputs {
+                        try output.emit(value: value)
+                    }
+                }
+            case let .last(id, transform):
+                for output in outputs {
+                    output.finish()
+                }
+                await client.finalizeStream(instruction: id, transform: 
transform)
+                return (id,transform)
+            //TODO: Handle timer messages
+            default:
+                break
+            }
+        }
+        return (context.instruction,context.transform)
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
new file mode 100644
index 00000000000..ac959d3235d
--- /dev/null
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/DataplaneClient.swift
@@ -0,0 +1,192 @@
+import Foundation
+import GRPC
+import Logging
+
+/// Client for handling the multiplexing and demultiplexing of Dataplane 
messages
+actor DataplaneClient {
+
+    public struct Pair : Hashable {
+        let id: String
+        let transform: String
+    }
+    
+    public enum Message {
+        case data(Data)
+        case timer(String,Data)
+        case last(String,String)
+        case flush
+    }
+    struct Multiplex {
+        let id: String
+        let transform: String
+        let message: Message
+    }
+    typealias InternalStream = AsyncStream<Multiplex>
+    typealias Stream = AsyncStream<Message>
+    
+    public struct MultiplexContinuation {
+        let id: String
+        let transform: String
+        let base: InternalStream.Continuation
+        
+        @discardableResult
+        func yield(_ value: Message) -> 
InternalStream.Continuation.YieldResult {
+            base.yield(Multiplex(id: id, transform: transform, message: value))
+        }
+        
+        func finish() {
+            // Does nothing
+        }
+        
+
+        
+    }
+    
+
+    private let id: String
+    private let log: Logging.Logger
+    private let multiplex: (InternalStream,InternalStream.Continuation)
+    private var streams: 
[Pair:(Stream,Stream.Continuation,MultiplexContinuation)] = [:]
+    private let flush: Int
+
+    public init(id:String,endpoint:ApiServiceDescriptor,flush:Int=100) throws {
+        self.id  = id
+        self.log = Logging.Logger(label: "Dataplane(\(id),\(endpoint.url))")
+        self.multiplex = AsyncStream.makeStream(of:Multiplex.self)
+        self.flush = flush
+        let client = 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnDataAsyncClient(channel:try 
GRPCChannelPool.with(endpoint: endpoint, eventLoopGroup: 
PlatformSupport.makeEventLoopGroup(loopCount: 1)),defaultCallOptions: 
CallOptions(customMetadata:["worker_id":id]))
+        let stream = client.makeDataCall()
+
+        // Mux task
+        Task {
+            log.info("Initiating data plane multiplexing.")
+            
+            let input = multiplex.0
+
+            var elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
+            for try await element in input {
+                var shouldFlush: Bool = false
+                switch element.message {
+                    
+                case .data(let payload):
+                    elements.data.append(.with {
+                        $0.instructionID = element.id
+                        $0.transformID = element.transform
+                        $0.data = payload
+                    })
+                case let .timer(family, payload):
+                    elements.timers.append(.with {
+                        $0.instructionID = element.id
+                        $0.transformID = element.transform
+                        $0.timerFamilyID = family
+                        $0.timers = payload
+                    })
+                case let .last(id, transform):
+                    elements.data.append(.with {
+                        $0.instructionID = id
+                        $0.transformID = transform
+                        $0.isLast = true
+                    })
+                    shouldFlush = true
+                case .flush:
+                    shouldFlush = true
+                }
+                if shouldFlush || elements.data.count + elements.timers.count 
>= flush {
+                    do {
+                        try await stream.requestStream.send(elements)
+                    } catch {
+                        log.error("Unable to multiplex elements onto data 
plane: \(error)")
+                    }
+                    elements = Org_Apache_Beam_Model_FnExecution_V1_Elements()
+                    shouldFlush = false
+                }
+            }
+        }
+
+        // Demux task
+        Task {
+            log.info("Initiating data plane demultiplexing.")
+            do {
+                
+                for try await elements in stream.responseStream {
+                    var last: [Pair:Message] = [:] // Split out last calls so 
they are always at the end
+                    var messages: [Pair:[Message]] = [:]
+                    
+                    for element in elements.data {
+                        let key = Pair(id: element.instructionID, transform: 
element.transformID)
+                        //Drop zero-length elements
+                        if element.data.count > 0 {
+                            
messages[key,default:[]].append(.data(element.data))
+                        }
+                        if element.isLast {
+                            last[key] = .last(element.instructionID, 
element.transformID)
+                        }
+                    }
+                    
+                    for element in elements.timers {
+                        let key = 
Pair(id:element.instructionID,transform:element.transformID)
+                        if element.timers.count > 0 {
+                            
messages[key,default:[]].append(.timer(element.timerFamilyID, element.timers))
+                        }
+                        if element.isLast {
+                            last[key] = .last(element.instructionID, 
element.transformID)
+                        }
+                    }
+                    
+                    // Send the messages to registered sources
+                    for (key,value) in messages {
+                        let output = await self.makeStream(key:key).1
+                        for v in value {
+                            output.yield(v)
+                        }
+                    }
+                    // Send any last messages
+                    for (key,value) in last {
+                        let output = await self.makeStream(key: key).1
+                        output.yield(value)
+                    }
+                }
+            } catch {
+                log.error("Lost data plane connection.")
+            }
+            
+        }
+    }
+    
+    /// Returns or creates a stream for a particular instruction,transform 
pair with both the multiplex and demultiplex continuations
+    internal func makeStream(key:Pair) -> 
(Stream,Stream.Continuation,MultiplexContinuation) {
+        if let existing = streams[key] {
+            return existing
+        }
+        let baseStream = AsyncStream.makeStream(of:Message.self)
+        let stream = (baseStream.0,baseStream.1,
+                      MultiplexContinuation(id: key.id, transform: 
key.transform, base: multiplex.1))
+        streams[key] = stream
+        return stream
+    }
+    
+    /// Returns or creates a stream for a particular instruction,transform 
pair with the multiplex continuation but not the demultiplex
+    /// which mirrors the response from AsyncStream.makeStream
+    public func makeStream(instruction:String,transform:String) -> 
(Stream,MultiplexContinuation) {
+        let key = Pair(id:instruction,transform: transform)
+        let (stream,_,continuation) = makeStream(key:key)
+        return (stream,continuation)
+    }
+    
+    func finalizeStream(instruction:String,transform:String) {
+        //TODO: Implement finalization.
+    }
+
+    
+    
+    private static var dataplanes: [ApiServiceDescriptor:DataplaneClient] = [:]
+    public static func client(for endpoint:ApiServiceDescriptor,worker id: 
String) throws -> DataplaneClient {
+        if let client = dataplanes[endpoint] {
+            return client
+        } else {
+            let client = try DataplaneClient(id: id, endpoint: endpoint)
+            dataplanes[endpoint] = client
+            return client
+        }
+    }
+}
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index 4bbbd44b889..a9c2effee6b 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -1,5 +1,75 @@
+import GRPC
+import NIOCore
+import Logging
+
 actor Worker {
-    public init(id:String) {
+    private let id: String
+    private let collections: [String:AnyPCollection]
+    private let fns: [String:SerializableFn]
+    private let control: ApiServiceDescriptor
+    private let remoteLog: ApiServiceDescriptor
+    
+    private let log: Logging.Logger
+    
+    public 
init(id:String,control:ApiServiceDescriptor,log:ApiServiceDescriptor,collections:[String:AnyPCollection],functions:
 [String:SerializableFn]) {
+        self.id = id
+        self.collections = collections
+        self.fns = functions
+        self.control = control
+        self.remoteLog = log
+        
+        self.log = Logging.Logger(label: "Worker(\(id))")
+    }
+    
+    
+    
+    public func start() throws {
+        let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
+        let client = 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel: try 
GRPCChannelPool.with(endpoint: control, eventLoopGroup: group))
+        let (responses,responder) = 
AsyncStream.makeStream(of:Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse.self)
+        let options = CallOptions(customMetadata: ["worker_id":id])
+        let control = client.makeControlCall(callOptions: options)
+        
+        
+        //Start the response task. This will continue until a yield call is 
sent from responder
+        Task {
+            for await r in responses {
+                try await control.requestStream.send(r)
+            }
+        }
+        //Start the actual work task
+        Task {
+            log.info("Waiting for control plane instructions.")
+            var processors: [String:BundleProcessor] = [:]
+            
+            func processor(for bundle: String) async throws -> BundleProcessor 
{
+                if let processor = processors[bundle] {
+                    return processor
+                }
+                let descriptor = try await 
client.getProcessBundleDescriptor(.with { $0.processBundleDescriptorID = bundle 
})
+                let processor = try BundleProcessor(id: id, descriptor: 
descriptor, collections: collections, fns: fns)
+                processors[bundle] = processor
+                return processor
+            }
+            
+
+            //This looks a little bit reversed from the usual because response 
don't need an initiating call
+            for try await instruction in control.responseStream {
+                switch instruction.request {
+                case .processBundle(let pbr):
+                    try await processor(for:pbr.processBundleDescriptorID)
+                        .process(instruction: 
instruction.instructionID,responder:responder)
+                    break
+                default:
+                    log.warning("Ignoring instruction 
\(instruction.instructionID). Not yet implemented.")
+                    log.warning("\(instruction)")
+                    responder.yield(.with {
+                        $0.instructionID = instruction.instructionID
+                    })
+                }
+            }
+            log.info("Control plane connection has closed.")
+        }
         
     }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift 
b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
index e45c3f7eacb..40d7768903f 100644
--- a/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
+++ b/sdks/swift/Sources/ApacheBeam/Runtime/Worker/WorkerProvider.swift
@@ -23,9 +23,12 @@ actor WorkerProvider : 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker
     private let log = Logging.Logger(label: "Worker")
     private var workers: [String:Worker] = [:]
 
+    private let collections: [String:AnyPCollection]
     private let functions: [String:SerializableFn]
     
-    init(_ functions: [String:SerializableFn]) {
+    
+    init(_ collections: [String:AnyPCollection],_ functions: 
[String:SerializableFn]) throws {
+        self.collections = collections
         self.functions = functions
     }
     
@@ -35,15 +38,19 @@ actor WorkerProvider : 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker
     func startWorker(request: 
Org_Apache_Beam_Model_FnExecution_V1_StartWorkerRequest, context: 
GRPC.GRPCAsyncServerCallContext) async throws -> 
Org_Apache_Beam_Model_FnExecution_V1_StartWorkerResponse {
         log.info("Got request to start worker \(request.workerID)")
         do {
-            if let worker = workers[request.workerID] {
+            if workers[request.workerID] != nil {
                 log.info("Worker \(request.workerID) is already running.")
                 return .with { _ in }
             } else {
-                workers[request.workerID] = try Worker(id: request.workerID)
-            }
-            return .with { _ in
-                
+                let worker = Worker(id: request.workerID,
+                                                       control: 
ApiServiceDescriptor(proto:request.controlEndpoint),
+                                                       
log:ApiServiceDescriptor(proto: request.loggingEndpoint),
+                                                       collections: 
collections,
+                                                       functions: functions)
+                try await worker.start()
+                workers[request.workerID] = worker
             }
+            return .with { _ in }
         } catch {
             log.error("Unable to start worker \(request.workerID): \(error)")
             return .with {
@@ -55,6 +62,22 @@ actor WorkerProvider : 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnExternalWorker
     func stopWorker(request: 
Org_Apache_Beam_Model_FnExecution_V1_StopWorkerRequest, context: 
GRPC.GRPCAsyncServerCallContext) async throws -> 
Org_Apache_Beam_Model_FnExecution_V1_StopWorkerResponse {
         return .with { _ in }
     }
+}
+
+public struct WorkerServer {
+    private let server: Server
+
+    public let endpoint: ApiServiceDescriptor
     
-    
+    public init(_ collections: [String:AnyPCollection],_ fns: 
[String:SerializableFn],host:String="localhost",port:Int=0) throws {
+        server = try .insecure(group: 
PlatformSupport.makeEventLoopGroup(loopCount:1))
+            .withServiceProviders([WorkerProvider(collections,fns)])
+            .bind(host:host,port:port)
+            .wait()
+        if let port = server.channel.localAddress?.port {
+            endpoint = ApiServiceDescriptor(host: host, port: port)
+        } else {
+            throw ApacheBeamError.runtimeError("Unable to get server local 
address port.")
+        }
+    }
 }
diff --git a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift 
b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
index 54c3ae4586e..b0e4d21edcb 100644
--- a/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
+++ b/sdks/swift/Sources/ApacheBeam/Transforms/Basic.swift
@@ -16,6 +16,8 @@ public extension PCollection {
 
 /// Convenience logging mappers
 public extension PCollection {
+    
+    @discardableResult
     func log(prefix:String,name:String = "\(#file):\(#line)") -> 
PCollection<Of> where Of == String {
         pardo(name,prefix) { prefix,input,output in
             for await element in input {
@@ -25,6 +27,7 @@ public extension PCollection {
         }
     }
     
+    @discardableResult
     func log<K,V>(prefix:String,name:String = "\(#file):\(#line)") -> 
PCollection<KV<K,V>> where Of == KV<K,V> {
         pardo(name,prefix) { prefix,input,output in
             for await element in input {
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt
new file mode 100644
index 00000000000..e177e396e79
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file1.txt
@@ -0,0 +1,4 @@
+This file contains some text where we would like to count things
+This is another line
+This is a third line
+Wow there are definitely some lines here.
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt
new file mode 100644
index 00000000000..14bda52d20e
--- /dev/null
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/Fixtures/file2.txt
@@ -0,0 +1,3 @@
+Another file for testing.
+This one contains different lines from the first file..
+The fact that things are getting called is amazing though!
diff --git a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift 
b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
index 625ba1eafbd..d3743cfbc56 100644
--- a/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
+++ b/sdks/swift/Tests/ApacheBeamTests/Pipeline/IntegrationTests.swift
@@ -33,8 +33,8 @@ final class IntegrationTests: XCTestCase {
     override func tearDownWithError() throws {
     }
 
-    func testPortableWordcount() throws {
-        try Pipeline { pipeline in
+    func testPortableWordcount() async throws {
+        try await Pipeline { pipeline in
             let (contents,errors) = pipeline
                 .create(["file1.txt","file2.txt","missing.txt"])
                 .pardo { filenames,output,errors in
@@ -61,16 +61,17 @@ final class IntegrationTests: XCTestCase {
                 .flatMap({ $0.components(separatedBy: .whitespaces) })
                 .groupBy({ ($0,1) })
                 .sum()
+                .log(prefix:"INTERMEDIATE OUTPUT")
             
             let normalizedCounts = baseCount.groupBy {
                 ($0.key.lowercased().trimmingCharacters(in: 
.punctuationCharacters),
                  $0.value ?? 1)
             }.sum()
             
-            _ = normalizedCounts.log(prefix:"COUNT OUTPUT")
-            _ = errors.log(prefix:"ERROR OUTPUT")
+            normalizedCounts.log(prefix:"COUNT OUTPUT")
+            errors.log(prefix:"ERROR OUTPUT")
             
-        }.run()
+        }.run(PortableRunner(loopback:true)) 
         
         
     }

Reply via email to