This is an automated email from the ASF dual-hosted git repository.

bce pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/beam-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b69cc0  Add metrics support so for Prism compatibility
4b69cc0 is described below

commit 4b69cc0974681f0a723035c6ea7547b6f5e875ae
Author: Byron Ellis <[email protected]>
AuthorDate: Wed Jul 9 09:21:51 2025 -0700

    Add metrics support so for Prism compatibility
---
 Sources/ApacheBeam/Core/Fn/SerializableFn.swift    |  1 +
 .../Core/PCollection/AnyPCollection.swift          | 10 +--
 .../ApacheBeam/Core/PCollection/PCollection.swift  |  9 ++-
 .../Core/PCollection/PCollectionStream.swift       |  5 +-
 Sources/ApacheBeam/Runners/PortableRunner.swift    |  2 +-
 .../Runtime/Bundle/BundleProcessor.swift           | 61 ++++++++++++++---
 Sources/ApacheBeam/Runtime/Bundle/Sink.swift       | 10 ++-
 Sources/ApacheBeam/Runtime/Bundle/Source.swift     | 13 +++-
 .../Runtime/Metrics/MetricAccumulator.swift        | 78 ++++++++++++++++++++++
 .../Runtime/Metrics/MetricReporter.swift           | 57 +++++++++++++++-
 .../Runtime/Metrics/MetricsRegistry.swift          | 64 ++++++++++++++++++
 .../Runtime/Metrics/ReportableMetric.swift         | 56 ++++++++++++++++
 Sources/ApacheBeam/Runtime/Worker/Worker.swift     | 58 +++++++++++++++-
 .../ApacheBeam/Testing/PCollection+Testing.swift   |  2 +-
 Sources/ApacheBeam/Testing/PCollectionTest.swift   |  6 +-
 Tests/ApacheBeamTests/Pipeline/FileIOTests.swift   |  1 -
 16 files changed, 400 insertions(+), 33 deletions(-)

diff --git a/Sources/ApacheBeam/Core/Fn/SerializableFn.swift 
b/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
index 9b8cbc2..6fe881e 100644
--- a/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
+++ b/Sources/ApacheBeam/Core/Fn/SerializableFn.swift
@@ -23,6 +23,7 @@ public struct SerializableFnBundleContext {
     let instruction: String
     let transform: String
     let payload: Data
+    let metrics: MetricReporter
     let log: Logging.Logger
 }
 
diff --git a/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift 
b/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
index 4bf88c5..293ab72 100644
--- a/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
+++ b/Sources/ApacheBeam/Core/PCollection/AnyPCollection.swift
@@ -25,7 +25,7 @@ public struct AnyPCollection: PCollectionProtocol, 
PipelineMember {
     let applyClosure: (Any, PipelineTransform) -> PipelineTransform
     let consumersClosure: (Any) -> [PipelineTransform]
     let coderClosure: (Any) -> Coder
-    let streamClosure: (Any) -> AnyPCollectionStream
+    let streamClosure: (Any,@escaping ElementReporter) -> AnyPCollectionStream
     let rootsClosure: (Any) -> [PCollection<Never>]
     let streamTypeClosure: (Any) -> StreamType
 
@@ -40,7 +40,7 @@ public struct AnyPCollection: PCollectionProtocol, 
PipelineMember {
             applyClosure = { ($0 as! C).apply($1) }
             consumersClosure = { ($0 as! C).consumers }
             coderClosure = { ($0 as! C).coder }
-            streamClosure = { AnyPCollectionStream(($0 as! C).stream) }
+            streamClosure = { AnyPCollectionStream(($0 as! C).stream($1)) }
             parentClosure = { ($0 as! C).parent }
             rootsClosure = { ($0 as! PipelineMember).roots }
             streamTypeClosure = { ($0 as! C).streamType }
@@ -64,7 +64,7 @@ public struct AnyPCollection: PCollectionProtocol, 
PipelineMember {
         coderClosure(collection)
     }
 
-    public var stream: PCollectionStream<Never> {
+    public func stream(_ reporter: @escaping ElementReporter) -> 
PCollectionStream<Never> {
         fatalError("Do not use `stream` on AnyPCollection. Use `anyStream` 
instead.")
     }
 
@@ -72,8 +72,8 @@ public struct AnyPCollection: PCollectionProtocol, 
PipelineMember {
         streamTypeClosure(collection)
     }
 
-    public var anyStream: AnyPCollectionStream {
-        streamClosure(collection)
+    public func anyStream(_ reporter: @escaping ElementReporter) -> 
AnyPCollectionStream {
+        streamClosure(collection,reporter)
     }
 
     var roots: [PCollection<Never>] {
diff --git a/Sources/ApacheBeam/Core/PCollection/PCollection.swift 
b/Sources/ApacheBeam/Core/PCollection/PCollection.swift
index d5c0dcc..02e836c 100644
--- a/Sources/ApacheBeam/Core/PCollection/PCollection.swift
+++ b/Sources/ApacheBeam/Core/PCollection/PCollection.swift
@@ -26,6 +26,8 @@ public enum WindowingStrategy {
     case unspecified
 }
 
+public typealias ElementReporter = (Int,Int?) -> Void
+
 public protocol PCollectionProtocol {
     associatedtype Of
 
@@ -34,11 +36,12 @@ public protocol PCollectionProtocol {
     var parent: PipelineTransform? { get }
     var consumers: [PipelineTransform] { get }
     var coder: Coder { get }
-    var stream: Stream { get }
     var streamType: StreamType { get }
 
     @discardableResult
     func apply(_ transform: PipelineTransform) -> PipelineTransform
+    
+    func stream(_ elements: @escaping ElementReporter) -> Stream
 }
 
 public final class PCollection<Of>: PCollectionProtocol {
@@ -62,8 +65,8 @@ public final class PCollection<Of>: PCollectionProtocol {
         streamType = type
     }
 
-    public var stream: PCollectionStream<Of> {
-        staticStream ?? PCollectionStream<Of>()
+    public func stream(_ reporter: @escaping ElementReporter) -> 
PCollectionStream<Of> {
+        return staticStream ?? PCollectionStream<Of>(reporter)
     }
 
     @discardableResult
diff --git a/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift 
b/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
index 76b63ac..1fd8cc3 100644
--- a/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
+++ b/Sources/ApacheBeam/Core/PCollection/PCollectionStream.swift
@@ -24,9 +24,11 @@ public final class PCollectionStream<Of>: AsyncSequence {
 
     private let stream: AsyncStream<Element>
     private let emitter: AsyncStream<Element>.Continuation
+    private let reporter: ElementReporter
 
-    public init() {
+    public init(_ reporter: @escaping ElementReporter) {
         (stream, emitter) = AsyncStream.makeStream(of: Element.self)
+        self.reporter = reporter
     }
 
     public func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
@@ -38,6 +40,7 @@ public final class PCollectionStream<Of>: AsyncSequence {
     }
 
     public func emit(_ value: Element) {
+        reporter(1,nil)
         emitter.yield(value)
     }
 
diff --git a/Sources/ApacheBeam/Runners/PortableRunner.swift 
b/Sources/ApacheBeam/Runners/PortableRunner.swift
index f4591e2..84f8738 100644
--- a/Sources/ApacheBeam/Runners/PortableRunner.swift
+++ b/Sources/ApacheBeam/Runners/PortableRunner.swift
@@ -64,7 +64,7 @@ public struct PortableRunner: PipelineRunner {
             })
             log.info("Job \(job.jobID) status: \(status.state)")
             switch status.state {
-            case .stopped, .failed, .done:
+            case .stopped, .failed, .done, .cancelled:
                 done = true
             default:
                 try await Task.sleep(for: .seconds(5))
diff --git a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift 
b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
index 413186b..b2e2542 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/BundleProcessor.swift
@@ -20,6 +20,9 @@ import Logging
 
 import Foundation
 
+typealias ProgressUpdater = 
(AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
 -> Void
+
+
 struct BundleProcessor {
     let log: Logging.Logger
 
@@ -32,28 +35,38 @@ struct BundleProcessor {
     }
 
     let steps: [Step]
+    let (bundleMetrics,bundleMetricsReporter) = AsyncStream.makeStream(of: 
MetricCommand.self)
 
     init(id: String,
          descriptor: 
Org_Apache_Beam_Model_FnExecution_V1_ProcessBundleDescriptor,
          collections: [String: AnyPCollection],
-         fns: [String: SerializableFn]) throws
+         fns: [String: SerializableFn],
+         registry: MetricsRegistry) async throws
     {
         log = Logging.Logger(label: "BundleProcessor(\(id) \(descriptor.id))")
 
         var temp: [Step] = []
-        var coders = BundleCoderContainer(bundle: descriptor)
+        let coders = BundleCoderContainer(bundle: descriptor)
 
         var streams: [String: AnyPCollectionStream] = [:]
         // First make streams for everything in this bundle (maybe I could use 
the pcollection array for this?)
         for (_, transform) in descriptor.transforms {
             for id in transform.inputs.values {
                 if streams[id] == nil {
-                    streams[id] = collections[id]!.anyStream
+                    let metrics = MetricReporter(registry: registry, reporter: 
bundleMetricsReporter,transform: transform.uniqueName, pcollection: id)
+                    let elementsRead = await metrics.elementCount(name: 
"total-elements-read")
+                    streams[id] = collections[id]!.anyStream({ count,_ in
+                        elementsRead(count)
+                    })
                 }
             }
             for id in transform.outputs.values {
                 if streams[id] == nil {
-                    streams[id] = collections[id]!.anyStream
+                    let metrics = MetricReporter(registry: registry, reporter: 
bundleMetricsReporter,transform: transform.uniqueName, pcollection: id)
+                    let elementsWritten = await metrics.elementCount(name: 
"total-elements-written")
+                    streams[id] = collections[id]!.anyStream({ count,_ in
+                        elementsWritten(count)
+                    })
                 }
             }
         }
@@ -104,15 +117,33 @@ struct BundleProcessor {
         }
         steps = temp
     }
-
-    public func process(instruction: String, responder: 
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
 async {
+    
+    public func process(instruction: String, accumulator: MetricAccumulator, 
responder: 
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation)
 async {
+        
+        // Start metric handling. This should complete after the group
+        Task {
+            var reporter = await accumulator.reporter
+            log.info("Monitoring bundle metrics for \(instruction)")
+            for await command in bundleMetrics {
+                switch command {
+                case .update(_, _):
+                    reporter.yield(command)
+                case .report(_):
+                    continue
+                case .finish(_):
+                    log.info("Done monitoring bundle metrics for 
\(instruction)")
+                    return
+                }
+            }
+        }
+        
         _ = await withThrowingTaskGroup(of: (String, String).self) { group in
             log.info("Starting bundle processing for \(instruction)")
             var count: Int = 0
             do {
                 for step in steps {
                     log.info("Starting Task \(step.transformId)")
-                    let context = SerializableFnBundleContext(instruction: 
instruction, transform: step.transformId, payload: step.payload, log: log)
+                    let context = SerializableFnBundleContext(instruction: 
instruction, transform: step.transformId, payload: step.payload, metrics: await 
MetricReporter(accumulator: accumulator, transform: step.transformId), log: log)
                     group.addTask {
                         try await step.fn.process(context: context, inputs: 
step.inputs, outputs: step.outputs)
                     }
@@ -123,10 +154,18 @@ struct BundleProcessor {
                     finished += 1
                     log.info("Task Completed (\(instruction),\(transform)) 
\(finished) of \(count)")
                 }
-                log.info("All tasks completed for \(instruction)")
-                responder.yield(.with {
-                    $0.instructionID = instruction
-                })
+                bundleMetricsReporter.yield(.finish({ _,_ in }))
+                await accumulator.reporter.yield(.finish({ 
metricInfo,metricData in
+                    log.info("All tasks completed for \(instruction)")
+                    responder.yield(.with {
+                        $0.instructionID = instruction
+                        $0.processBundle = .with {
+                            $0.monitoringData.merge(metricData, 
uniquingKeysWith: {a,b in b})
+                            $0.monitoringInfos.append(contentsOf: metricInfo)
+                            $0.requiresFinalization = true
+                        }
+                    })
+                }));
             } catch {
                 responder.yield(.with {
                     $0.instructionID = instruction
diff --git a/Sources/ApacheBeam/Runtime/Bundle/Sink.swift 
b/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
index c719d45..58c92cc 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/Sink.swift
@@ -21,7 +21,7 @@ import Foundation
 final class Sink: SerializableFn {
     let client: DataplaneClient
     let coder: Coder
-
+    
     public init(client: DataplaneClient, coder: Coder) {
         self.client = client
         self.coder = coder
@@ -30,12 +30,20 @@ final class Sink: SerializableFn {
     func process(context: SerializableFnBundleContext,
                  inputs: [AnyPCollectionStream], outputs _: 
[AnyPCollectionStream]) async throws -> (String, String)
     {
+        let bytesWritten = await context.metrics.counter(name: "bytes-written")
+        let recordsWritten = await context.metrics.counter(name: 
"records-written")
         let (_, emitter) = await client.makeStream(instruction: 
context.instruction, transform: context.transform)
+        var bytes = 0
+        var records = 0
         for try await element in inputs[0] {
             var output = Data()
             try coder.encode(element, data: &output)
+            bytes += output.count
+            records += 1
             emitter.yield(.data(output))
         }
+        bytesWritten(bytes)
+        recordsWritten(records)
         emitter.yield(.last(context.instruction, context.transform))
         emitter.finish()
         await client.finalizeStream(instruction: context.instruction, 
transform: context.transform)
diff --git a/Sources/ApacheBeam/Runtime/Bundle/Source.swift 
b/Sources/ApacheBeam/Runtime/Bundle/Source.swift
index adb6885..f752d47 100644
--- a/Sources/ApacheBeam/Runtime/Bundle/Source.swift
+++ b/Sources/ApacheBeam/Runtime/Bundle/Source.swift
@@ -24,7 +24,10 @@ import Logging
 final class Source: SerializableFn {
     let client: DataplaneClient
     let coder: Coder
+
     let log: Logger
+    
+    
 
     public init(client: DataplaneClient, coder: Coder) {
         self.client = client
@@ -36,6 +39,8 @@ final class Source: SerializableFn {
                  inputs _: [AnyPCollectionStream], outputs: 
[AnyPCollectionStream]) async throws -> (String, String)
     {
         log.info("Waiting for input on 
\(context.instruction)-\(context.transform)")
+        let bytesRead = await context.metrics.counter(name: "bytes-read")
+        let recordsRead = await context.metrics.counter(name: "records-read")
         let (stream, _) = await client.makeStream(instruction: 
context.instruction, transform: context.transform)
 
         var messages = 0
@@ -45,13 +50,19 @@ final class Source: SerializableFn {
             switch message {
             case let .data(data):
                 var d = data
+                let totalBytes = data.count
+                var localCount = 0
                 while d.count > 0 {
                     let value = try coder.decode(&d)
                     for output in outputs {
                         try output.emit(value: value)
-                        count += 1
                     }
+                    count += 1
+                    localCount += 1
                 }
+                bytesRead(totalBytes - d.count)
+                recordsRead(localCount)
+                
             case let .last(id, transform):
                 for output in outputs {
                     output.finish()
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift 
b/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift
new file mode 100644
index 0000000..e140847
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricAccumulator.swift
@@ -0,0 +1,78 @@
+//
+//  File.swift
+//  
+//
+//  Created by Byron Ellis on 9/2/24.
+//
+
+import Foundation
+import Logging
+
+typealias MetricDataFn = 
([Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo],Dictionary<String,Data>) -> 
Void
+
+enum MetricCommand {
+    case update(String,ReportableMetric)
+    case report(MetricDataFn)
+    case finish(MetricDataFn)
+}
+
+typealias MetricStream = AsyncStream<MetricCommand>
+typealias MetricStreamReporter = MetricStream.Continuation
+
+public actor MetricAccumulator {
+    let log: Logging.Logger
+
+    var registry: MetricsRegistry
+    var values: [String:ReportableMetric] = [:]
+    var stream:MetricStream
+    var _reporter:MetricStreamReporter
+    
+    var reporter: MetricStreamReporter { get {
+        return _reporter
+    }}
+    
+    public init(instruction: String, registry: MetricsRegistry) {
+        self.registry = registry
+        (stream,_reporter) = AsyncStream.makeStream(of:MetricCommand.self)
+        self.log = Logging.Logger(label: "Accumulator(\(instruction))")
+    }
+    
+    func reportMetrics(_ to:MetricDataFn) async {
+        var info: [Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo] = []
+        var outputs: [String:Data] = [:]
+        for (mId,value) in values {
+            do {
+                let payload = try value.encode()
+                info.append(await registry.monitoringInfo(mId, payload: 
payload))
+                outputs[mId] = payload
+            } catch {
+                log.error("Unable to write metric \(mId): \(error)")
+            }
+        }
+        to(info,outputs)
+    }
+    
+    public func start() {
+        Task {
+            log.info("Starting metric command processing.")
+            for await cmd in stream {
+                switch cmd {
+                case let .update(mId, value):
+                    if let o = values[mId] {
+                        values.updateValue(o.merge(other: value), forKey: mId)
+                    } else {
+                        values[mId] = value
+                    }
+                case let .report(to):
+                    await reportMetrics(to)
+                case let .finish(to):
+                    await reportMetrics(to)
+                    values.removeAll()
+                    break
+                }
+            }
+            log.info("Shutting down metric command processing.")
+        }
+    }
+    
+}
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift 
b/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
index fb5b59e..93de51f 100644
--- a/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricReporter.swift
@@ -16,5 +16,58 @@
  * limitations under the License.
  */
 
-/// Background actor that handles reporting metrics to the backend. Allows the 
metrics implementation to be asynchronous.
-actor MetricReporter {}
+
+public typealias Counter = (Int) -> Void
+public typealias Distribution = (Int) -> Void
+
+public struct MetricReporter {
+    
+    let registry: MetricsRegistry
+    let reporter: MetricStreamReporter
+    let transform : String
+    let pcollection: String
+    
+    init(registry: MetricsRegistry,reporter: MetricStreamReporter, transform: 
String = "",pcollection: String = "") {
+        self.registry = registry
+        self.reporter = reporter
+        self.transform = transform
+        self.pcollection = pcollection
+    }
+    
+    init(accumulator: MetricAccumulator, transform: String = "", pcollection: 
String = "") async {
+        self.init(registry: await accumulator.registry, reporter: await 
accumulator.reporter,transform: transform, pcollection: pcollection)
+    }
+    
+    public func register(_ name: String,namespace:String = "",transform:String 
= "", pcollection:String = "",initialValue: ReportableMetric) async -> String {
+        return await 
registry.register(name,urn:initialValue.urn,type:initialValue.type,namespace:namespace,transform:transform,pcollection:pcollection)
+    }
+    
+    public func counter(name: String, namespace: String = "", pcollection: 
String? = nil) async -> Counter {
+        let value:ReportableMetric = .counter(0)
+        let metricId = await register(name,namespace: namespace,transform: 
transform, pcollection: pcollection ?? self.pcollection, initialValue: value)
+        reporter.yield(.update(metricId, value))
+        return { update in
+            reporter.yield(.update(metricId, .counter(update)))
+        }
+    }
+    public func elementCount(name:String, namespace: String = 
"",transform:String? = nil,pcollection: String? = nil) async -> Counter {
+        let value: ReportableMetric = .counter(0)
+        let metricId = await 
registry.register(name,urn:"beam:metric:element_count:v1",type:"beam:metrics:sum_int64:v1",
+                                               transform: transform ?? 
self.transform,pcollection: pcollection ?? self.pcollection)
+        reporter.yield(.update(metricId, value))
+        return { update in
+            reporter.yield(.update(metricId, .counter(update)))
+        }
+    }
+    
+    public func distribution(name : String, namespace: String = "", 
pcollection: String? = nil) async -> Distribution {
+        let value:ReportableMetric = .distribution(0,0,Int.max,Int.min)
+        let metricId = await register(name,namespace: namespace,transform: 
transform, pcollection: pcollection ?? self.pcollection, initialValue: value)
+        reporter.yield(.update(metricId, value))
+        return { update in
+            reporter.yield(.update(metricId, 
.distribution(1,update,update,update)))
+        }
+    }
+}
+
+
diff --git a/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift 
b/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift
new file mode 100644
index 0000000..40d8db9
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/MetricsRegistry.swift
@@ -0,0 +1,64 @@
+//
+//  File.swift
+//  
+//
+//  Created by Byron Ellis on 9/3/24.
+//
+
+import Foundation
+
+public struct MetricDefinition {
+    let id: String
+    let name: String
+    let labels: [String:String]
+    let urn: String
+    let type: String
+}
+
+
+public actor MetricsRegistry {
+    
+    var definitions: [String:MetricDefinition] = [:]
+    var names: [String:String] = [:]
+    var id: Int = 1
+
+    public func register(_ name: 
String,urn:String,type:String,namespace:String = "",transform:String = 
"",pcollection:String = "") -> String {
+        let fullName = "\(transform)-\(namespace)-\(name)"
+        if let metricId = names[fullName] {
+            return metricId
+        } else {
+            let metricId = "m\(id)"
+            id = id + 1
+            var l:[String:String] = [:]
+            l["NAME"] = name
+            l["NAMESPACE"] = namespace
+            
+            if transform != "" {
+                l["TRANSFORM"] = transform
+            }
+            if pcollection != "" {
+                l["PCOLLECTION"] = pcollection
+            }
+            definitions[metricId] = MetricDefinition(id: metricId, name: 
fullName, labels: l, urn: urn, type: type)
+            return metricId
+        }
+    }
+    
+    
+
+}
+
+extension MetricsRegistry {
+    func monitoringInfo(_ id: String,payload: Data? = nil) -> 
Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo {
+        return .with {
+            if let def = definitions[id] {
+                $0.type = def.type
+                $0.urn = def.urn
+                $0.labels.merge(def.labels, uniquingKeysWith: { $1 })
+            }
+            if let p = payload {
+                $0.payload = p
+            }
+        }
+    }
+}
diff --git a/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift 
b/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift
new file mode 100644
index 0000000..724b017
--- /dev/null
+++ b/Sources/ApacheBeam/Runtime/Metrics/ReportableMetric.swift
@@ -0,0 +1,56 @@
+import Foundation
+
+public enum ReportableMetric {
+    case counter(Int)
+    case distribution(Int,Int,Int,Int)
+    
+    public func merge(other:ReportableMetric) -> ReportableMetric {
+        switch (self,other) {
+        case let (.counter(val),.counter(other_val)):
+            return .counter(val+other_val)
+        case let (.counter(val),.distribution(count, sum, min, max)):
+            return .counter(val + sum)
+        case let (.distribution(count,sum,min,max),.distribution(other_count, 
other_sum, other_min, other_max)):
+            return .distribution(count+other_count, sum+other_sum, min < 
other_min ? min : other_min, max > other_max ? max : other_max)
+        case let (.distribution(count,sum,min,max),.counter(val)):
+            return .distribution(count+1, sum+val, min < val ? min : val, max 
> val ? max : val)
+        }
+    }
+    public var type: String { get {
+        switch self {
+        case .counter(_):
+            return "beam:metrics:sum_int64:v1"
+        case .distribution(_, _, _, _):
+            return "beam:metrics:distribution_int64:v1"
+        }
+    }}
+
+    public var urn: String { get {
+        switch self {
+        case .counter(_):
+            return "beam:metric:user:sum_int64:v1"
+        case .distribution(_, _, _, _):
+            return "beam:metric:user:distribution_int64:v1"
+        }
+    }}
+    
+    public func update(val:Int) -> ReportableMetric {
+        switch self {
+        case let .counter(current):
+            return .counter(current+val)
+        case let .distribution(count, sum, min, max):
+            return .distribution(count+1, sum+val, min < val ? min : val, max 
> val ? max : val)
+        }
+    }
+    
+    
+    public func encode() throws -> Data {
+        switch self {
+        case let .counter(c):
+            return try Coder.varint.encode(c)
+        case let .distribution(count, sum, min, max):
+            return try Coder.iterable(Coder.varint).encode([count,sum,min,max])
+        }
+    }
+}
+
diff --git a/Sources/ApacheBeam/Runtime/Worker/Worker.swift 
b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
index 3588770..a48e93a 100644
--- a/Sources/ApacheBeam/Runtime/Worker/Worker.swift
+++ b/Sources/ApacheBeam/Runtime/Worker/Worker.swift
@@ -20,6 +20,10 @@ import GRPC
 import Logging
 import NIOCore
 
+typealias InstructionResponder = 
AsyncStream<Org_Apache_Beam_Model_FnExecution_V1_InstructionResponse>.Continuation
+
+
+
 actor Worker {
     private let id: String
     private let collections: [String: AnyPCollection]
@@ -28,6 +32,7 @@ actor Worker {
     private let remoteLog: ApiServiceDescriptor
 
     private let log: Logging.Logger
+    private let registry: MetricsRegistry
 
     public init(id: String, control: ApiServiceDescriptor, log: 
ApiServiceDescriptor, collections: [String: AnyPCollection], functions: 
[String: SerializableFn]) {
         self.id = id
@@ -37,6 +42,7 @@ actor Worker {
         remoteLog = log
 
         self.log = Logging.Logger(label: "Worker(\(id))")
+        self.registry = MetricsRegistry()
     }
 
     public func start() throws {
@@ -49,37 +55,83 @@ actor Worker {
         // Start the response task. This will continue until a yield call is 
sent from responder
         Task {
             for await r in responses {
-                log.info("Sending response \(r)")
+//                log.info("Sending response \(r)")
                 try await control.requestStream.send(r)
             }
         }
+        
         // Start the actual work task
         Task {
             log.info("Waiting for control plane instructions.")
             var processors: [String: BundleProcessor] = [:]
+            var metrics: [String: MetricAccumulator] = [:]
 
             func processor(for bundle: String) async throws -> BundleProcessor 
{
                 if let processor = processors[bundle] {
                     return processor
                 }
                 let descriptor = try await 
client.getProcessBundleDescriptor(.with { $0.processBundleDescriptorID = bundle 
})
-                let processor = try BundleProcessor(id: id, descriptor: 
descriptor, collections: collections, fns: fns)
+                let processor = try await BundleProcessor(id: id, descriptor: 
descriptor, collections: collections, fns: fns, registry: registry)
                 processors[bundle] = processor
                 return processor
             }
 
             // This looks a little bit reversed from the usual because 
response don't need an initiating call
             for try await instruction in control.responseStream {
+//                log.info("\(instruction)")
                 switch instruction.request {
                 case let .processBundle(pbr):
                     do {
                         let p = try await processor(for: 
pbr.processBundleDescriptorID)
+                        let accumulator = MetricAccumulator(instruction: 
instruction.instructionID, registry: registry)
+                        await accumulator.start()
+                        metrics[instruction.instructionID] = accumulator
                         Task {
-                            await p.process(instruction: 
instruction.instructionID, responder: responder)
+                            await p.process(instruction: 
instruction.instructionID,accumulator: accumulator, responder: responder)
                         }
                     } catch {
                         log.error("Unable to process bundle 
\(pbr.processBundleDescriptorID): \(error)")
                     }
+                case let .processBundleProgress(pbpr):
+//                    log.info("Requesting bundle progress of 
\(pbpr.instructionID)")
+                    if let accumulator = metrics[pbpr.instructionID] {
+                        await accumulator.reporter.yield(.report({ metricInfo, 
metricData in
+                            responder.yield(.with {
+                                $0.instructionID = instruction.instructionID
+                                $0.processBundleProgress = .with {
+                                    $0.monitoringData.merge(metricData, 
uniquingKeysWith: { $1 })
+                                    $0.monitoringInfos.append(contentsOf: 
metricInfo)
+                                }
+                            })
+                        }))
+                    }
+                case let .processBundleSplit(pbsr):
+                    log.info("Requesting bundle split for 
\(pbsr.instructionID)")
+                    responder.yield(.with {
+                        $0.instructionID = instruction.instructionID
+                        $0.processBundleSplit = .with { _ in }
+                    })
+                case let .finalizeBundle(fbr):
+                    log.info("Finializing bundle \(fbr.instructionID)")
+                    metrics.removeValue(forKey: fbr.instructionID)
+                    responder.yield(.with {
+                        $0.instructionID = fbr.instructionID
+                        $0.finalizeBundle = .with { _ in }
+                    })
+                case let .monitoringInfos(mimr):
+                    log.info("Requesting monitoring information")
+                    var tmp: 
[String:Org_Apache_Beam_Model_Pipeline_V1_MonitoringInfo] = [:]
+                    for id in mimr.monitoringInfoID {
+                        tmp[id] = await registry.monitoringInfo(id)
+                    }
+                    log.info("\(tmp)")
+                    responder.yield(.with {
+                        $0.instructionID = instruction.instructionID
+                        $0.monitoringInfos = .with {
+                            $0.monitoringInfo.merge(tmp, uniquingKeysWith: { 
$1 })
+                        }
+                    })
+
                 default:
                     log.warning("Ignoring instruction 
\(instruction.instructionID). Not yet implemented.")
                     log.warning("\(instruction)")
diff --git a/Sources/ApacheBeam/Testing/PCollection+Testing.swift 
b/Sources/ApacheBeam/Testing/PCollection+Testing.swift
index d0dc11f..f52ee30 100644
--- a/Sources/ApacheBeam/Testing/PCollection+Testing.swift
+++ b/Sources/ApacheBeam/Testing/PCollection+Testing.swift
@@ -21,7 +21,7 @@ import Foundation
 public extension PCollection {
     /// Create a PCollection whose stream has been preloaded with some values 
for testing
     static func testValues<V: Beamable>(_ values: [V]) -> PCollection<V> {
-        let stream = PCollectionStream<V>()
+        let stream = PCollectionStream<V>({ _,_ in })
         for v in values {
             stream.emit(v, timestamp: .now, window: .global)
         }
diff --git a/Sources/ApacheBeam/Testing/PCollectionTest.swift 
b/Sources/ApacheBeam/Testing/PCollectionTest.swift
index 641656a..f9e9520 100644
--- a/Sources/ApacheBeam/Testing/PCollectionTest.swift
+++ b/Sources/ApacheBeam/Testing/PCollectionTest.swift
@@ -33,9 +33,9 @@ public struct PCollectionTest {
         if let transform = output.parent {
             switch transform {
             case let .pardo(parent, _, fn, outputs):
-                let context = try SerializableFnBundleContext(instruction: 
"1", transform: "test", payload: fn.payload, log: log)
-                let input = parent.anyStream
-                let streams = outputs.map(\.anyStream)
+                let context = try SerializableFnBundleContext(instruction: 
"1", transform: "test", payload: fn.payload, metrics: await 
MetricReporter(accumulator: 
MetricAccumulator(instruction:"inst000",registry:MetricsRegistry())), log: log)
+                let input = parent.anyStream({ _,_ in })
+                let streams = outputs.map({ $0.anyStream({ _,_ in }) })
 
                 try await withThrowingTaskGroup(of: Void.self) { group in
                     log.info("Starting process task")
diff --git a/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift 
b/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
index 8ee73d2..0b2eb35 100644
--- a/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
+++ b/Tests/ApacheBeamTests/Pipeline/FileIOTests.swift
@@ -58,7 +58,6 @@ final class FileIOTests: XCTestCase {
     }
 
     func testShakespeareWordcount() async throws {
-        throw XCTSkip()
         try await Pipeline { pipeline in
             let contents = pipeline
                 .create(["dataflow-samples/shakespeare"])

Reply via email to