This is an automated email from the ASF dual-hosted git repository.
bce pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 4b69cc0 Add metrics support so for Prism compatibility
4b69cc0 is described below
commit 4b69cc0974681f0a723035c6ea7547b6f5e875ae
Author: Byron Ellis <[email protected]>
AuthorDate: Wed Jul 9 09:21:51 2025 -0700
Add metrics support so for Prism compatibility
---
Sources/ApacheBeam/Core/Fn/SerializableFn.swift | 1 +
.../Core/PCollection/AnyPCollection.swift | 10 +--
.../ApacheBeam/Core/PCollection/PCollection.swift | 9 ++-
.../Core/PCollection/PCollectionStream.swift | 5 +-
Sources/ApacheBeam/Runners/PortableRunner.swift | 2 +-
.../Runtime/Bundle/BundleProcessor.swift | 61 ++++++++++++++---
Sources/ApacheBeam/Runtime/Bundle/Sink.swift | 10 ++-
Sources/ApacheBeam/Runtime/Bundle/Source.swift | 13 +++-
.../Runtime/Metrics/MetricAccumulator.swift | 78 ++++++++++++++++++++++
.../Runtime/Metrics/MetricReporter.swift | 57 +++++++++++++++-
.../Runtime/Metrics/MetricsRegistry.swift | 64 ++++++++++++++++++
.../Runtime/Metrics/ReportableMetric.swift | 56 ++++++++++++++++
Sources/ApacheBeam/Runtime/Worker/Worker.swift | 58 +++++++++++++++-
.../ApacheBeam/Testing/PCollection+Testing.swift | 2 +-
Sources/ApacheBeam/Testing/PCollectionTest.swift | 6 +-
Tests/ApacheBeamTests/Pipeline/FileIOTests.swift | 1 -
16 files changed, 400 insertions(+), 33 deletions(-)
diff --git a/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
b/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index 9b8cbc2..6fe881e 100644
--- a/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
+++ b/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -23,6 +23,7 @@ public struct SerializableFnBundleContext {
let instruction: String
let transform: String
let payload: Data
+ let metrics: MetricReporter
let log: Logging.Logger
}
diff --git a/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
b/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 4bf88c5..293ab72 100644
--- a/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -25,7 +25,7 @@ public struct AnyPCollection: PCollectionProtocol,
PipelineMember {
let applyClosure: (Any, PipelineTransform) -> PipelineTransform
let consumersClosure: (Any) -> [PipelineTransform]
let coderClosure: (Any) -> Coder
- let streamClosure: (Any) -> AnyPCollectionStream
+ let streamClosure: (Any,@escaping ElementReporter) -> AnyPCollectionStream
let rootsClosure: (Any) -> [PCollection<Never>]
let streamTypeClosure: (Any) -> StreamType
@@ -40,7 +40,7 @@ public struct AnyPCollection: PCollectionProtocol,
PipelineMember {
applyClosure = { ($0 as! C).apply($1) }
consumersClosure = { ($0 as! C).consumers }
coderClosure = { ($0 as! C).coder }
- streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
+ streamClosure = { AnyPCollectionStream(($0 as! C).stream($1)) }
parentClosure = { ($0 as! C).parent }
rootsClosure = { ($0 as! PipelineMember).roots }
streamTypeClosure = { ($0 as! C).streamType }
@@ -64,7 +64,7 @@ public struct AnyPCollection: PCollectionProtocol,
PipelineMember {
coderClosure(collection)
}
- public var stream: PCollectionStream<Never> {
+ public func stream(_ reporter: @escaping ElementReporter) ->
PCollectionStream<Never> {
fatalError("Do not use `stream` on AnyPCollection. Use `anyStream`
instead.")
}
@@ -72,8 +72,8 @@ public struct AnyPCollection: PCollectionProtocol,
PipelineMember {
streamTypeClosure(collection)
}
- public var anyStream: AnyPCollectionStream {
- streamClosure(collection)
+ public func anyStream(_ reporter: @escaping ElementReporter) ->
AnyPCollectionStream {
+ streamClosure(collection,reporter)
}
var roots: [PCollection<Never>] {
diff --git a/Sources/ApacheBeam/Core/PCollection/PCollection.swift
b/Sources/ApacheBeam/Core/PCollection/PCollection.swift
index d5c0dcc..02e836c 100644
--- a/Sources/ApacheBeam/Core/PCollection/PCollection.swift
+++ b/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -26,6 +26,8 @@ public enum WindowingStrategy {
case unspecified
}
+public typealias ElementReporter = (Int,Int?) -> Void
+
public protocol PCollectionProtocol {
associatedtype Of
@@ -34,11 +36,12 @@ public protocol PCollectionProtocol {
var parent: PipelineTransform? { get }
var consumers: [PipelineTransform] { get }
var coder: Coder { get }
- var stream: Stream { get }
var streamType: StreamType { get }
@discardableResult
func apply(_ transform: PipelineTransform) -> PipelineTransform
+
+ func stream(_ elements: @escaping ElementReporter) -> Stream
}
public final class PCollection<Of>: PCollectionProtocol {
@@ -62,8 +65,8 @@ public final class PCollection<Of>: PCollectionProtocol {
streamType = type
}
- public var stream: PCollectionStream<Of> {
- staticStream ?? PCollectionStream<Of>()
+ public func stream(_ reporter: @escaping ElementReporter) ->
PCollectionStream<Of> {
+ return staticStream ?? PCollectionStream<Of>(reporter)
}
@discardableResult
diff --git a/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
b/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index 76b63ac..1fd8cc3 100644
--- a/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -24,9 +24,11 @@ public final class PCollectionStream<Of>: AsyncSequence {
private let stream: AsyncStream<Element>
private let emitter: AsyncStream<Element>.Continuation
+ private let reporter: ElementReporter
- public init() {
+ public init(_ reporter: @escaping ElementReporter) {
(stream, emitter) = AsyncStream.makeStream(of: Element.self)
+ self.reporter = reporter
}
public func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
@@ -38,6 +40,7 @@ public final class PCollectionStream<Of>: AsyncSequence {
}
public func emit(_ value: Element) {
+ reporter(1,nil)
emitter.yield(value)
}
diff --git a/Sources/ApacheBeam/Runners/PortableRunner.swift
b/Sources/ApacheBeam/Runners/PortableRunner.swift
index f4591e2..84f8738 100644
--- a/Sources/ApacheBeam/Runners/PortableRunner.swift
+++ b/Sources/ApacheBeam/Runners/PortableRunner.swift
@@ -64,7 +64,7 @@ public struct PortableRunner: PipelineRunner {
})
log.info("Job \(job.jobID) status: \(status.state)")
switch status.state {
- case .stopped, .failed, .done:
+ case .stopped, .failed, .done, .cancelled:
done = true
default:
try await Task.sleep(for: .seconds(5))
diff --git a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index 413186b..b2e2542 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -20,6 +20,9 @@ import Logging
import Foundation
+typealias ProgressUpdater =
(AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
-> Void
+
+
struct BundleProcessor {
let log: Logging.Logger
@@ -32,28 +35,38 @@ struct BundleProcessor {
}
let steps: [Step]
+ let (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of:
MetricCommand.self)
init(id: String,
descriptor:
Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
collections: [String: AnyPCollection],
- fns: [String: SerializableFn]) throws
+ fns: [String: SerializableFn],
+ registry: MetricsRegistry) async throws
{
log = Logging.Logger(label: "BundleProcessor(\(id) \(descriptor.id))")
var temp: [Step] = []
- var coders = BundleCoderContainer(bundle: descriptor)
+ let coders = BundleCoderContainer(bundle: descriptor)
var streams: [String: AnyPCollectionStream] = [:]
// First make streams for everything in this bundle (maybe I could use
the pcollection array for this?)
for (_, transform) in descriptor.transforms {
for id in transform.inputs.values {
if streams[id] == nil {
- streams[id] = collections[id]!.anyStream
+ let metrics = MetricReporter(registry: registry, reporter:
bundleMetricsReporter,transform: transform.uniqueName, pcollection: id)
+ let elementsRead = await metrics.elementCount(name:
"total-elements-read")
+ streams[id] = collections[id]!.anyStream({ count,_ in
+ elementsRead(count)
+ })
}
}
for id in transform.outputs.values {
if streams[id] == nil {
- streams[id] = collections[id]!.anyStream
+ let metrics = MetricReporter(registry: registry, reporter:
bundleMetricsReporter,transform: transform.uniqueName, pcollection: id)
+ let elementsWritten = await metrics.elementCount(name:
"total-elements-written")
+ streams[id] = collections[id]!.anyStream({ count,_ in
+ elementsWritten(count)
+ })
}
}
}
@@ -104,15 +117,33 @@ struct BundleProcessor {
}
steps = temp
}
-
- public func process(instruction: String, responder:
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
async {
+
+ public func process(instruction: String, accumulator: MetricAccumulator,
responder:
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
async {
+
+ // Start metric handling. This should complete after the group
+ Task {
+ var reporter = await accumulator.reporter
+ log.info("Monitoring bundle metrics for \(instruction)")
+ for await command in bundleMetrics {
+ switch command {
+ case .update(_, _):
+ reporter.yield(command)
+ case .report(_):
+ continue
+ case .finish(_):
+ log.info("Done monitoring bundle metrics for
\(instruction)")
+ return
+ }
+ }
+ }
+
_ = await withThrowingTaskGroup(of: (String, String).self) { group in
log.info("Starting bundle processing for \(instruction)")
var count: Int = 0
do {
for step in steps {
log.info("Starting Task \(step.transformId)")
- let context = SerializableFnBundleContext(instruction:
instruction, transform: step.transformId, payload: step.payload, log: log)
+ let context = SerializableFnBundleContext(instruction:
instruction, transform: step.transformId, payload: step.payload, metrics: await
MetricReporter(accumulator: accumulator, transform: step.transformId), log: log)
group.addTask {
try await step.fn.process(context: context, inputs:
step.inputs, outputs: step.outputs)
}
@@ -123,10 +154,18 @@ struct BundleProcessor {
finished += 1
log.info("Task Completed (\(instruction),\(transform))
\(finished) of \(count)")
}
- log.info("All tasks completed for \(instruction)")
- responder.yield(.with {
- $0.instructionID = instruction
- })
+ bundleMetricsReporter.yield(.finish({ _,_ in }))
+ await accumulator.reporter.yield(.finish({
metricInfo,metricData in
+ log.info("All tasks completed for \(instruction)")
+ responder.yield(.with {
+ $0.instructionID = instruction
+ $0.processBundle = .with {
+ $0.monitoringData.merge(metricData,
uniquingKeysWith: {a,b in b})
+ $0.monitoringInfos.append(contentsOf: metricInfo)
+ $0.requiresFinalization = true
+ }
+ })
+ }));
} catch {
responder.yield(.with {
$0.instructionID = instruction
diff --git a/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
b/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
index c719d45..58c92cc 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
@@ -21,7 +21,7 @@ import Foundation
final class Sink: SerializableFn {
let client: DataplaneClient
let coder: Coder
-
+
public init(client: DataplaneClient, coder: Coder) {
self.client = client
self.coder = coder
@@ -30,12 +30,20 @@ final class Sink: SerializableFn {
func process(context: SerializableFnBundleContext,
inputs: [AnyPCollectionStream], outputs _:
[AnyPCollectionStream]) async throws -> (String, String)
{
+ let bytesWritten = await context.metrics.counter(name: "bytes-written")
+ let recordsWritten = await context.metrics.counter(name:
"records-written")
let (_, emitter) = await client.makeStream(instruction:
context.instruction, transform: context.transform)
+ var bytes = 0
+ var records = 0
for try await element in inputs[0] {
var output = Data()
try coder.encode(element, data: &output)
+ bytes += output.count
+ records += 1
emitter.yield(.data(output))
}
+ bytesWritten(bytes)
+ recordsWritten(records)
emitter.yield(.last(context.instruction, context.transform))
emitter.finish()
await client.finalizeStream(instruction: context.instruction,
transform: context.transform)
diff --git a/Sources/ApacheBeam/Runtime/Bundle/Source.swift
b/Sources/ApacheBeam/Runtime/Bundle/Source.swift
index adb6885..f752d47 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/Source.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/Source.swift
@@ -24,7 +24,10 @@ import Logging
final class Source: SerializableFn {
let client: DataplaneClient
let coder: Coder
+
let log: Logger
+
+
public init(client: DataplaneClient, coder: Coder) {
self.client = client
@@ -36,6 +39,8 @@ final class Source: SerializableFn {
inputs _: [AnyPCollectionStream], outputs:
[AnyPCollectionStream]) async throws -> (String, String)
{
log.info("Waiting for input on
\(context.instruction)-\(context.transform)")
+ let bytesRead = await context.metrics.counter(name: "bytes-read")
+ let recordsRead = await context.metrics.counter(name: "records-read")
let (stream, _) = await client.makeStream(instruction:
context.instruction, transform: context.transform)
var messages = 0
@@ -45,13 +50,19 @@ final class Source: SerializableFn {
switch message {
case let .data(data):
var d = data
+ let totalBytes = data.count
+ var localCount = 0
while d.count > 0 {
let value = try coder.decode(&d)
for output in outputs {
try output.emit(value: value)
- count += 1
}
+ count += 1
+ localCount += 1
}
+ bytesRead(totalBytes - d.count)
+ recordsRead(localCount)
+
case let .last(id, transform):
for output in outputs {
output.finish()
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift
b/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift
new file mode 100644
index 0000000..e140847
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift
@@ -0,0 +1,78 @@
+//
+// File.swift
+//
+//
+// Created by Byron Ellis on 9/2/24.
+//
+
+import Foundation
+import Logging
+
+typealias MetricDataFn =
([Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo],Dictionary<String,Data>) ->
Void
+
+enum MetricCommand {
+ case update(String,ReportableMetric)
+ case report(MetricDataFn)
+ case finish(MetricDataFn)
+}
+
+typealias MetricStream = AsyncStream<MetricCommand>
+typealias MetricStreamReporter = MetricStream.Continuation
+
+public actor MetricAccumulator {
+ let log: Logging.Logger
+
+ var registry: MetricsRegistry
+ var values: [String:ReportableMetric] = [:]
+ var stream:MetricStream
+ var _reporter:MetricStreamReporter
+
+ var reporter: MetricStreamReporter { get {
+ return _reporter
+ }}
+
+ public init(instruction: String, registry: MetricsRegistry) {
+ self.registry = registry
+ (stream,_reporter) = AsyncStream.makeStream(of:MetricCommand.self)
+ self.log = Logging.Logger(label: "Accumulator(\(instruction))")
+ }
+
+ func reportMetrics(_ to:MetricDataFn) async {
+ var info: [Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo] = []
+ var outputs: [String:Data] = [:]
+ for (mId,value) in values {
+ do {
+ let payload = try value.encode()
+ info.append(await registry.monitoringInfo(mId, payload:
payload))
+ outputs[mId] = payload
+ } catch {
+ log.error("Unable to write metric \(mId): \(error)")
+ }
+ }
+ to(info,outputs)
+ }
+
+ public func start() {
+ Task {
+ log.info("Starting metric command processing.")
+ for await cmd in stream {
+ switch cmd {
+ case let .update(mId, value):
+ if let o = values[mId] {
+ values.updateValue(o.merge(other: value), forKey: mId)
+ } else {
+ values[mId] = value
+ }
+ case let .report(to):
+ await reportMetrics(to)
+ case let .finish(to):
+ await reportMetrics(to)
+ values.removeAll()
+ break
+ }
+ }
+ log.info("Shutting down metric command processing.")
+ }
+ }
+
+}
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
b/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
index fb5b59e..93de51f 100644
--- a/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
@@ -16,5 +16,58 @@
* limitations under the License.
*/
-/// Background actor that handles reporting metrics to the backend. Allows the
metrics implementation to be asynchronous.
-actor MetricReporter {}
+
+public typealias Counter = (Int) -> Void
+public typealias Distribution = (Int) -> Void
+
+public struct MetricReporter {
+
+ let registry: MetricsRegistry
+ let reporter: MetricStreamReporter
+ let transform : String
+ let pcollection: String
+
+ init(registry: MetricsRegistry,reporter: MetricStreamReporter, transform:
String = "",pcollection: String = "") {
+ self.registry = registry
+ self.reporter = reporter
+ self.transform = transform
+ self.pcollection = pcollection
+ }
+
+ init(accumulator: MetricAccumulator, transform: String = "", pcollection:
String = "") async {
+ self.init(registry: await accumulator.registry, reporter: await
accumulator.reporter,transform: transform, pcollection: pcollection)
+ }
+
+ public func register(_ name: String,namespace:String = "",transform:String
= "", pcollection:String = "",initialValue: ReportableMetric) async -> String {
+ return await
registry.register(name,urn:initialValue.urn,type:initialValue.type,namespace:namespace,transform:transform,pcollection:pcollection)
+ }
+
+ public func counter(name: String, namespace: String = "", pcollection:
String? = nil) async -> Counter {
+ let value:ReportableMetric = .counter(0)
+ let metricId = await register(name,namespace: namespace,transform:
transform, pcollection: pcollection ?? self.pcollection, initialValue: value)
+ reporter.yield(.update(metricId, value))
+ return { update in
+ reporter.yield(.update(metricId, .counter(update)))
+ }
+ }
+ public func elementCount(name:String, namespace: String =
"",transform:String? = nil,pcollection: String? = nil) async -> Counter {
+ let value: ReportableMetric = .counter(0)
+ let metricId = await
registry.register(name,urn:"beam:metric:element_count:v1",type:"beam:metrics:sum_int64:v1",
+ transform: transform ??
self.transform,pcollection: pcollection ?? self.pcollection)
+ reporter.yield(.update(metricId, value))
+ return { update in
+ reporter.yield(.update(metricId, .counter(update)))
+ }
+ }
+
+ public func distribution(name : String, namespace: String = "",
pcollection: String? = nil) async -> Distribution {
+ let value:ReportableMetric = .distribution(0,0,Int.max,Int.min)
+ let metricId = await register(name,namespace: namespace,transform:
transform, pcollection: pcollection ?? self.pcollection, initialValue: value)
+ reporter.yield(.update(metricId, value))
+ return { update in
+ reporter.yield(.update(metricId,
.distribution(1,update,update,update)))
+ }
+ }
+}
+
+
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift
b/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift
new file mode 100644
index 0000000..40d8db9
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift
@@ -0,0 +1,64 @@
+//
+// File.swift
+//
+//
+// Created by Byron Ellis on 9/3/24.
+//
+
+import Foundation
+
+public struct MetricDefinition {
+ let id: String
+ let name: String
+ let labels: [String:String]
+ let urn: String
+ let type: String
+}
+
+
+public actor MetricsRegistry {
+
+ var definitions: [String:MetricDefinition] = [:]
+ var names: [String:String] = [:]
+ var id: Int = 1
+
+ public func register(_ name:
String,urn:String,type:String,namespace:String = "",transform:String =
"",pcollection:String = "") -> String {
+ let fullName = "\(transform)-\(namespace)-\(name)"
+ if let metricId = names[fullName] {
+ return metricId
+ } else {
+ let metricId = "m\(id)"
+ id = id + 1
+ var l:[String:String] = [:]
+ l["NAME"] = name
+ l["NAMESPACE"] = namespace
+
+ if transform != "" {
+ l["TRANSFORM"] = transform
+ }
+ if pcollection != "" {
+ l["PCOLLECTION"] = pcollection
+ }
+ definitions[metricId] = MetricDefinition(id: metricId, name:
fullName, labels: l, urn: urn, type: type)
+ return metricId
+ }
+ }
+
+
+
+}
+
+extension MetricsRegistry {
+ func monitoringInfo(_ id: String,payload: Data? = nil) ->
Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo {
+ return .with {
+ if let def = definitions[id] {
+ $0.type = def.type
+ $0.urn = def.urn
+ $0.labels.merge(def.labels, uniquingKeysWith: { $1 })
+ }
+ if let p = payload {
+ $0.payload = p
+ }
+ }
+ }
+}
diff --git a/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift
b/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift
new file mode 100644
index 0000000..724b017
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift
@@ -0,0 +1,56 @@
+import Foundation
+
+public enum ReportableMetric {
+ case counter(Int)
+ case distribution(Int,Int,Int,Int)
+
+ public func merge(other:ReportableMetric) -> ReportableMetric {
+ switch (self,other) {
+ case let (.counter(val),.counter(other_val)):
+ return .counter(val+other_val)
+ case let (.counter(val),.distribution(count, sum, min, max)):
+ return .counter(val + sum)
+ case let (.distribution(count,sum,min,max),.distribution(other_count,
other_sum, other_min, other_max)):
+ return .distribution(count+other_count, sum+other_sum, min <
other_min ? min : other_min, max > other_max ? max : other_max)
+ case let (.distribution(count,sum,min,max),.counter(val)):
+ return .distribution(count+1, sum+val, min < val ? min : val, max
> val ? max : val)
+ }
+ }
+ public var type: String { get {
+ switch self {
+ case .counter(_):
+ return "beam:metrics:sum_int64:v1"
+ case .distribution(_, _, _, _):
+ return "beam:metrics:distribution_int64:v1"
+ }
+ }}
+
+ public var urn: String { get {
+ switch self {
+ case .counter(_):
+ return "beam:metric:user:sum_int64:v1"
+ case .distribution(_, _, _, _):
+ return "beam:metric:user:distribution_int64:v1"
+ }
+ }}
+
+ public func update(val:Int) -> ReportableMetric {
+ switch self {
+ case let .counter(current):
+ return .counter(current+val)
+ case let .distribution(count, sum, min, max):
+ return .distribution(count+1, sum+val, min < val ? min : val, max
> val ? max : val)
+ }
+ }
+
+
+ public func encode() throws -> Data {
+ switch self {
+ case let .counter(c):
+ return try Coder.varint.encode(c)
+ case let .distribution(count, sum, min, max):
+ return try Coder.iterable(Coder.varint).encode([count,sum,min,max])
+ }
+ }
+}
+
diff --git a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index 3588770..a48e93a 100644
--- a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -20,6 +20,10 @@ import GRPC
import Logging
import NIOCore
+typealias InstructionResponder =
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation
+
+
+
actor Worker {
private let id: String
private let collections: [String: AnyPCollection]
@@ -28,6 +32,7 @@ actor Worker {
private let remoteLog: ApiServiceDescriptor
private let log: Logging.Logger
+ private let registry: MetricsRegistry
public init(id: String, control: ApiServiceDescriptor, log:
ApiServiceDescriptor, collections: [String: AnyPCollection], functions:
[String: SerializableFn]) {
self.id = id
@@ -37,6 +42,7 @@ actor Worker {
remoteLog = log
self.log = Logging.Logger(label: "Worker(\(id))")
+ self.registry = MetricsRegistry()
}
public func start() throws {
@@ -49,37 +55,83 @@ actor Worker {
// Start the response task. This will continue until a yield call is
sent from responder
Task {
for await r in responses {
- log.info("Sending response \(r)")
+// log.info("Sending response \(r)")
try await control.requestStream.send(r)
}
}
+
// Start the actual work task
Task {
log.info("Waiting for control plane instructions.")
var processors: [String: BundleProcessor] = [:]
+ var metrics: [String: MetricAccumulator] = [:]
func processor(for bundle: String) async throws -> BundleProcessor
{
if let processor = processors[bundle] {
return processor
}
let descriptor = try await
client.getProcessBundleDescriptor(.with { $0.processBundleDescriptorID = bundle
})
- let processor = try BundleProcessor(id: id, descriptor:
descriptor, collections: collections, fns: fns)
+ let processor = try await BundleProcessor(id: id, descriptor:
descriptor, collections: collections, fns: fns, registry: registry)
processors[bundle] = processor
return processor
}
// This looks a little bit reversed from the usual because
response don't need an initiating call
for try await instruction in control.responseStream {
+// log.info("\(instruction)")
switch instruction.request {
case let .processBundle(pbr):
do {
let p = try await processor(for:
pbr.processBundleDescriptorID)
+ let accumulator = MetricAccumulator(instruction:
instruction.instructionID, registry: registry)
+ await accumulator.start()
+ metrics[instruction.instructionID] = accumulator
Task {
- await p.process(instruction:
instruction.instructionID, responder: responder)
+ await p.process(instruction:
instruction.instructionID,accumulator: accumulator, responder: responder)
}
} catch {
log.error("Unable to process bundle
\(pbr.processBundleDescriptorID): \(error)")
}
+ case let .processBundleProgress(pbpr):
+// log.info("Requesting bundle progress of
\(pbpr.instructionID)")
+ if let accumulator = metrics[pbpr.instructionID] {
+ await accumulator.reporter.yield(.report({ metricInfo,
metricData in
+ responder.yield(.with {
+ $0.instructionID = instruction.instructionID
+ $0.processBundleProgress = .with {
+ $0.monitoringData.merge(metricData,
uniquingKeysWith: { $1 })
+ $0.monitoringInfos.append(contentsOf:
metricInfo)
+ }
+ })
+ }))
+ }
+ case let .processBundleSplit(pbsr):
+ log.info("Requesting bundle split for
\(pbsr.instructionID)")
+ responder.yield(.with {
+ $0.instructionID = instruction.instructionID
+ $0.processBundleSplit = .with { _ in }
+ })
+ case let .finalizeBundle(fbr):
+ log.info("Finializing bundle \(fbr.instructionID)")
+ metrics.removeValue(forKey: fbr.instructionID)
+ responder.yield(.with {
+ $0.instructionID = fbr.instructionID
+ $0.finalizeBundle = .with { _ in }
+ })
+ case let .monitoringInfos(mimr):
+ log.info("Requesting monitoring information")
+ var tmp:
[String:Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo] = [:]
+ for id in mimr.monitoringInfoID {
+ tmp[id] = await registry.monitoringInfo(id)
+ }
+ log.info("\(tmp)")
+ responder.yield(.with {
+ $0.instructionID = instruction.instructionID
+ $0.monitoringInfos = .with {
+ $0.monitoringInfo.merge(tmp, uniquingKeysWith: {
$1 })
+ }
+ })
+
default:
log.warning("Ignoring instruction
\(instruction.instructionID). Not yet implemented.")
log.warning("\(instruction)")
diff --git a/Sources/ApacheBeam/Testing/PCollection+Testing.swift
b/Sources/ApacheBeam/Testing/PCollection+Testing.swift
index d0dc11f..f52ee30 100644
--- a/Sources/ApacheBeam/Testing/PCollection+Testing.swift
+++ b/Sources/ApacheBeam/Testing/PCollection+Testing.swift
@@ -21,7 +21,7 @@ import Foundation
public extension PCollection {
/// Create a PCollection whose stream has been preloaded with some values
for testing
static func testValues<V: Beamable>(_ values: [V]) -> PCollection<V> {
- let stream = PCollectionStream<V>()
+ let stream = PCollectionStream<V>({ _,_ in })
for v in values {
stream.emit(v, timestamp: .now, window: .global)
}
diff --git a/Sources/ApacheBeam/Testing/PCollectionTest.swift
b/Sources/ApacheBeam/Testing/PCollectionTest.swift
index 641656a..f9e9520 100644
--- a/Sources/ApacheBeam/Testing/PCollectionTest.swift
+++ b/Sources/ApacheBeam/Testing/PCollectionTest.swift
@@ -33,9 +33,9 @@ public struct PCollectionTest {
if let transform = output.parent {
switch transform {
case let .pardo(parent, _, fn, outputs):
- let context = try SerializableFnBundleContext(instruction:
"1", transform: "test", payload: fn.payload, log: log)
- let input = parent.anyStream
- let streams = outputs.map(\.anyStream)
+ let context = try SerializableFnBundleContext(instruction:
"1", transform: "test", payload: fn.payload, metrics: await
MetricReporter(accumulator:
MetricAccumulator(instruction:"inst000",registry:MetricsRegistry())), log: log)
+ let input = parent.anyStream({ _,_ in })
+ let streams = outputs.map({ $0.anyStream({ _,_ in }) })
try await withThrowingTaskGroup(of: Void.self) { group in
log.info("Starting process task")
diff --git a/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
b/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
index 8ee73d2..0b2eb35 100644
--- a/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
+++ b/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -58,7 +58,6 @@ final class FileIOTests: XCTestCase {
}
func testShakespeareWordcount() async throws {
- throw XCTSkip()
try await Pipeline { pipeline in
let contents = pipeline
.create(["dataflow-samples/shakespeare"])