[
https://issues.apache.org/jira/browse/BEAM-6138?focusedWorklogId=173072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173072
]
ASF GitHub Bot logged work on BEAM-6138:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/18 21:36
Start Date: 07/Dec/18 21:36
Worklog Time Spent: 10m
Work Description: ajamato commented on a change in pull request #6799:
[BEAM-6138] Add User Counter Metric Support to Java SDK
URL: https://github.com/apache/beam/pull/6799#discussion_r239949691
##########
File path:
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -400,6 +497,123 @@ public void processElement(ProcessContext context) {
}
}
+ @Test
+ public void testMetrics() throws Exception {
+ Pipeline p = Pipeline.create();
+ PCollection<String> input =
+ p.apply("impulse", Impulse.create())
+ .apply(
+ "create",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ Metrics.counter(RemoteExecutionTest.class,
"counterMetric").inc();
+ }
+ }))
+ .setCoder(StringUtf8Coder.of());
+
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+ Optional<ExecutableStage> optionalStage =
+ Iterables.tryFind(fused.getFusedStages(), (ExecutableStage stage) ->
true);
+ checkState(optionalStage.isPresent(), "Expected a stage with side
inputs.");
+ ExecutableStage stage = optionalStage.get();
+
+ ExecutableProcessBundleDescriptor descriptor =
+ ProcessBundleDescriptors.fromExecutableStage(
+ "test_stage",
+ stage,
+ dataServer.getApiServiceDescriptor(),
+ stateServer.getApiServiceDescriptor());
+
+ BundleProcessor processor =
+ controlClient.getProcessor(
+ descriptor.getProcessBundleDescriptor(),
+ descriptor.getRemoteInputDestinations(),
+ stateDelegator);
+
+ Map<Target, Coder<WindowedValue<?>>> outputTargets =
descriptor.getOutputTargetCoders();
+ Map<Target, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
+ Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+ for (Entry<Target, Coder<WindowedValue<?>>> targetCoder :
outputTargets.entrySet()) {
+ List<WindowedValue<?>> outputContents = Collections.synchronizedList(new
ArrayList<>());
+ outputValues.put(targetCoder.getKey(), outputContents);
+ outputReceivers.put(
+ targetCoder.getKey(),
+ RemoteOutputReceiver.of(targetCoder.getValue(),
outputContents::add));
+ }
+
+ Iterable<byte[]> sideInputData =
+ Arrays.asList(
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+ CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
+
+ StateRequestHandler stateRequestHandler =
+ StateRequestHandlers.forSideInputHandlerFactory(
+ descriptor.getSideInputSpecs(),
+ new SideInputHandlerFactory() {
+ @Override
+ public <T, V, W extends BoundedWindow> SideInputHandler<V, W>
forSideInput(
+ String pTransformId,
+ String sideInputId,
+ RunnerApi.FunctionSpec accessPattern,
+ Coder<T> elementCoder,
+ Coder<W> windowCoder) {
+ return new SideInputHandler<V, W>() {
+ @Override
+ public Iterable<V> get(byte[] key, W window) {
+ return (Iterable) sideInputData;
+ }
+
+ @Override
+ public Coder<V> resultCoder() {
+ return ((KvCoder) elementCoder).getValueCoder();
+ }
+ };
+ }
+ });
+
+ BundleProgressHandler progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse progress) {}
+
+ @Override
+ public void onCompleted(ProcessBundleResponse response) {
+ // Assert the timestamps are non empty then 0 them out before
comparing.
+ List<MonitoringInfo> actualMIs = new ArrayList<>();
+ for (MonitoringInfo mi : response.getMonitoringInfosList()) {
+ MonitoringInfo.Builder builder = MonitoringInfo.newBuilder();
+ Assert.assertTrue(mi.getTimestamp().getSeconds() > 0);
+ builder.mergeFrom(mi);
+ builder.clearTimestamp();
+ actualMIs.add(builder.build());
+ }
+
+ SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder();
+ builder.setUrnForUserMetric(RemoteExecutionTest.class.getName(),
"counterMetric");
Review comment:
done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173072)
Time Spent: 1h 50m (was: 1h 40m)
> Add User Metric Support to Java SDK
> -----------------------------------
>
> Key: BEAM-6138
> URL: https://issues.apache.org/jira/browse/BEAM-6138
> Project: Beam
> Issue Type: New Feature
> Components: java-fn-execution
> Reporter: Alex Amato
> Assignee: Alex Amato
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)