ajamato commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412564549
##########
File path:
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
(Coder<WindowedValue<?>>) remoteOutputCoder.getValue(),
outputContents::add));
}
- Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+ final String testPTransformId = "create/ParMultiDo(Metrics)";
+ BundleProgressHandler progressHandler =
+ new BundleProgressHandler() {
+ @Override
+ public void onProgress(ProcessBundleProgressResponse response) {
+ MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+ List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
- StateRequestHandler stateRequestHandler =
- StateRequestHandlers.forSideInputHandlerFactory(
- descriptor.getSideInputSpecs(),
- new SideInputHandlerFactory() {
- @Override
- public <V, W extends BoundedWindow>
- IterableSideInputHandler<V, W> forIterableSideInput(
- String pTransformId,
- String sideInputId,
- Coder<V> elementCoder,
- Coder<W> windowCoder) {
- throw new UnsupportedOperationException();
- }
+ // We expect all user counters except for the ones in @FinishBundle
+ // Since non-user metrics are registered at bundle creation time,
they will still report
+ // values most of which will be 0.
- @Override
- public <K, V, W extends BoundedWindow>
- MultimapSideInputHandler<K, V, W> forMultimapSideInput(
- String pTransformId,
- String sideInputId,
- KvCoder<K, V> elementCoder,
- Coder<W> windowCoder) {
- return new MultimapSideInputHandler<K, V, W>() {
- @Override
- public Iterable<V> get(BoundedWindow window) {
- return null;
- }
+ SimpleMonitoringInfoBuilder builder = new
SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64SumValue(1);
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- @Override
- public Coder<K> keyCoder() {
- return elementCoder.getKeyCoder();
- }
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.START_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64SumValue(10);
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- @Override
- public Coder<V> valueCoder() {
- return elementCoder.getValueCoder();
- }
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.FINISH_USER_COUNTER_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+
matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
- @Override
- public Iterable<V> get(K key, W window) {
- return (Iterable) sideInputData;
- }
- };
- }
- });
+ // User Distributions.
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
+ MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64DistributionValue(DistributionData.create(1, 1, 1,
1));
+
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
- String testPTransformId = "create/ParMultiDo(Anonymous)";
- BundleProgressHandler progressHandler =
- new BundleProgressHandler() {
- @Override
- public void onProgress(ProcessBundleProgressResponse progress) {}
+ builder = new SimpleMonitoringInfoBuilder();
+ builder
+ .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAMESPACE,
RemoteExecutionTest.class.getName())
+ .setLabel(
+ MonitoringInfoConstants.Labels.NAME,
MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+ builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
testPTransformId);
+ builder.setInt64DistributionValue(DistributionData.create(10, 1,
10, 10));
Review comment:
Are these values likely to easily change? You could consider writing new
matchers in MonitoringInfoMatchers to match that the values are non 0 or
something instead to make it easier to maintain.
Or write matchers that just verify a few fields, unless you really want to
verify everything is set on every MonitoringInfo. Might also make it simpler to
maintain. Unless you think they are all relevant. Up to your discretion here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]