This is an automated email from the ASF dual-hosted git repository.
bce pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-swift.git
The following commit(s) were added to refs/heads/main by this push:
new f4c12bb Fix up worker id to work with latest version of Prism
f4c12bb is described below
commit f4c12bb85c35e3dd15576d989a4e1cf3385dc7fd
Author: Byron Ellis <[email protected]>
AuthorDate: Wed Jul 9 18:45:05 2025 -0700
Fix up worker id to work with latest version of Prism
---
Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift | 4 ++--
Sources/ApacheBeam/Runtime/Worker/Worker.swift | 5 ++---
Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift | 4 ++--
3 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index b2e2542..405f863 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -35,7 +35,7 @@ struct BundleProcessor {
}
let steps: [Step]
- let (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of:
MetricCommand.self)
+ var (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of:
MetricCommand.self)
init(id: String,
descriptor:
Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
@@ -122,7 +122,7 @@ struct BundleProcessor {
// Start metric handling. This should complete after the group
Task {
- var reporter = await accumulator.reporter
+ let reporter = await accumulator.reporter
log.info("Monitoring bundle metrics for \(instruction)")
for await command in bundleMetrics {
switch command {
diff --git a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index a48e93a..68f8fc8 100644
--- a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -47,10 +47,9 @@ actor Worker {
public func start() throws {
let group = PlatformSupport.makeEventLoopGroup(loopCount: 1)
- let client = try
Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel:
GRPCChannelPool.with(endpoint: control, eventLoopGroup: group))
+ let client = try
Org_Apache_Beam_Model_FnExecution_V1_BeamFnControlAsyncClient(channel:
GRPCChannelPool.with(endpoint: control, eventLoopGroup:
group),defaultCallOptions: CallOptions(customMetadata: ["worker_id": id]))
let (responses, responder) = AsyncStream.makeStream(of:
Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse.self)
- let options = CallOptions(customMetadata: ["worker_id": id])
- let control = client.makeControlCall(callOptions: options)
+ let control = client.makeControlCall()
// Start the response task. This will continue until a yield call is
sent from responder
Task {
diff --git a/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
b/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
index 7700cb9..84fe756 100644
--- a/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
+++ b/Tests/ApacheBeamTests/CrossLanguage/ExpansionServiceTests.swift
@@ -24,11 +24,11 @@ final class ExpansionServiceTests: XCTestCase {
override func tearDownWithError() throws {}
- func testConnectExpansionService() async throws {
+ /*func testConnectExpansionService() async throws {
let client = try ExpansionClient(endpoint: .init(host: "localhost",
port: 8097))
let transforms = try await client.transforms()
for t in transforms {
print("\(t)")
}
- }
+ }*/
}