This is an automated email from the ASF dual-hosted git repository.

bce pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new f4c12bb  Fix up worker id to work with latest version of Prism
f4c12bb is described below

commit f4c12bb85c35e3dd15576d989a4e1cf3385dc7fd
Author: Byron Ellis <[email protected]>
AuthorDate: Wed Jul 9 18:45:05 2025 -0700

    Fix up worker id to work with latest version of Prism
---
 Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift         | 4 ++--
 Sources/ApacheBeam/Runtime/Worker/Worker.swift                  | 5 ++---
 Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift | 4 ++--
 3 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift 
b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index b2e2542..405f863 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -35,7 +35,7 @@ struct BundleProcessor {
     }
 
     let steps: [Step]
-    let (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of: 
MetricCommand.self)
+    var (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of: 
MetricCommand.self)
 
     init(id: String,
          descriptor: 
Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
@@ -122,7 +122,7 @@ struct BundleProcessor {
         
         // Start metric handling. This should complete after the group
         Task {
-            var reporter = await accumulator.reporter
+            let reporter = await accumulator.reporter
             log.info("Monitoring bundle metrics for \(instruction)")
             for await command in bundleMetrics {
                 switch command {
diff --git a/Sources/ApacheBeam/Runtime/Worker/Worker.swift 
b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index a48e93a..68f8fc8 100644
--- a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -47,10 +47,9 @@ actor Worker {
 
     public func start() throws {
         let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
-        let client = try 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel: 
GRPCChannelPool.with(endpoint: control, eventLoopGroup: group))
+        let client = try 
Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel: 
GRPCChannelPool.with(endpoint: control, eventLoopGroup: 
group),defaultCallOptions: CallOptions(customMetadata: ["worker_id": id]))
         let (responses, responder) = AsyncStream.makeStream(of: 
Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse.self)
-        let options = CallOptions(customMetadata: ["worker_id": id])
-        let control = client.makeControlCall(callOptions: options)
+        let control = client.makeControlCall()
 
         // Start the response task. This will continue until a yield call is 
sent from responder
         Task {
diff --git a/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift 
b/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
index 7700cb9..84fe756 100644
--- a/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
+++ b/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
@@ -24,11 +24,11 @@ final class ExpansionServiceTests: XCTestCase {
 
     override func tearDownWithError() throws {}
 
-    func testConnectExpansionService() async throws {
+    /*func testConnectExpansionService() async throws {
         let client = try ExpansionClient(endpoint: .init(host: "localhost", 
port: 8097))
         let transforms = try await client.transforms()
         for t in transforms {
             print("\(t)")
         }
-    }
+    }*/
 }

Reply via email to