m-trieu commented on code in PR #29592:
URL: https://github.com/apache/beam/pull/29592#discussion_r1414692392


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+public class ActiveMessageMetadata {
+
+  public String userStepName;

Review Comment:
   if this is a a value class it is recommended to use go/autovalue.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -111,15 +120,58 @@ public Collection<Windmill.LatencyAttribution> 
getLatencyAttributions() {
       if (duration.equals(Duration.ZERO)) {
         continue;
       }
-      list.add(
-          Windmill.LatencyAttribution.newBuilder()
-              .setState(state)
-              .setTotalDurationMillis(duration.getMillis())
-              .build());
+      LatencyAttribution.Builder laBuilder = 
Windmill.LatencyAttribution.newBuilder();
+      if (state == LatencyAttribution.State.ACTIVE) {
+        laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, 
workId, sampler);
+      }
+      Windmill.LatencyAttribution la =
+          
laBuilder.setState(state).setTotalDurationMillis(duration.getMillis()).build();
+      list.add(la);
     }
     return list;
   }
 
+  private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(
+      Boolean isHeartbeat,

Review Comment:
   unless isHeartbeat is nullable, prefer to use the primitive type (boolean) 
to the boxed. go/java-practices/primitives#prefer-unboxed



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1892,6 +1891,14 @@ private void invalidateStuckCommits() {
     }
   }
 
+  public static String constructWorkId(Windmill.WorkItem workItem) {

Review Comment:
   How about letting this be in `Work.java`? and then it can be called like 
`Work.getId` or `Work.getLatencyTrackingId` etc.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTimeUtils.MillisProvider;
+
+public class DataflowExecutionStateSampler extends ExecutionStateSampler {

Review Comment:
   does this class need to be threadsafe?
   
   If yes, then HashMaps are not safe for concurrent writes.  Consider using 
`ConcurrentHashMap` if the operations needed to be threadsafe are independent.  
If there needs to be synchronization across multiple data structures where the 
operations need to be atomic, it will needs to be synchronized via a lock 
(synchronized methods, synchronized blocks, synchronized(someLockObject)).



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTimeUtils.MillisProvider;
+
+public class DataflowExecutionStateSampler extends ExecutionStateSampler {

Review Comment:
   for example if not atomic, you can make these ConcurrentMaps and then drop 
the synchornization for `resetForWorkId` since the map will handle it 
underneath.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -307,5 +335,34 @@ public Closeable enterState(ExecutionState newState) {
     public String getWorkItemId() {
       return this.workItemId;
     }
+
+    public ActiveMessageMetadata getActiveMessageMetadata() {
+      return activeMessageMetadata;
+    }
+
+    public Map<String, IntSummaryStatistics> getProcessingTimesByStep() {

Review Comment:
   this will pass the reference and allow callers to mutate the map.
   unless that's intentional, consider wrapping in 
`Collections.unmodifiableMap(processingTimesByStep)` or returning an 
`ImmutableMap`go/java-practices/immutability#collections



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTimeUtils.MillisProvider;
+
+public class DataflowExecutionStateSampler extends ExecutionStateSampler {
+
+  private static final MillisProvider SYSTEM_MILLIS_PROVIDER = 
System::currentTimeMillis;
+  private static final DataflowExecutionStateSampler INSTANCE =
+      new DataflowExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);
+
+  private HashMap<String, DataflowExecutionStateTracker> 
activeTrackersByWorkId = new HashMap<>();
+  private HashMap<String, Map<String, IntSummaryStatistics>> 
completedProcessingMetrics =
+      new HashMap<>();
+
+  public static DataflowExecutionStateSampler instance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  public static DataflowExecutionStateSampler newForTest(MillisProvider clock) 
{
+    return new DataflowExecutionStateSampler(checkNotNull(clock));
+  }
+
+  public DataflowExecutionStateSampler(MillisProvider clock) {
+    super(clock);
+  }
+
+  @Override
+  public void addTracker(ExecutionStateTracker tracker) {
+    if (!(tracker instanceof DataflowExecutionStateTracker)) {
+      return;
+    }
+    DataflowExecutionStateTracker dfTracker = (DataflowExecutionStateTracker) 
tracker;
+    this.activeTrackersByWorkId.put(dfTracker.getWorkItemId(), dfTracker);
+  }
+
+  private Map<String, IntSummaryStatistics> mergeStepStatsMaps(

Review Comment:
   nit: can this be static? (not sure if it references any member variables or 
methods)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -101,7 +109,8 @@ private void recordGetWorkStreamLatencies(
     }
   }
 
-  public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
+  public Collection<Windmill.LatencyAttribution> getLatencyAttributions(
+      Boolean isHeartbeat, String workId, DataflowExecutionStateSampler 
sampler) {

Review Comment:
   unless `isHeartbeat` is nullable, prefer to use the primitive type 
(`boolean`) to the boxed. go/java-practices/primitives#prefer-unboxed



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTimeUtils.MillisProvider;
+
+public class DataflowExecutionStateSampler extends ExecutionStateSampler {

Review Comment:
   nit: Make final class. go/java-practices/final#classes, unless you need to 
mock it, but since this is an implementation you can mock the parent 
class/interface where its used so I think final would be good here.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -307,5 +335,34 @@ public Closeable enterState(ExecutionState newState) {
     public String getWorkItemId() {
       return this.workItemId;
     }
+
+    public ActiveMessageMetadata getActiveMessageMetadata() {

Review Comment:
   If this is nullable, recommended to make it explicit by returning 
`Optional<ActiveMessageMetadata>` [go/java-practices/optional]



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -245,6 +248,16 @@ public static class DataflowExecutionStateTracker extends 
ExecutionStateTracker
     private final ContextActivationObserverRegistry 
contextActivationObserverRegistry;
     private final String workItemId;
 
+    /**
+     * Metadata on the message whose processing is currently being managed by 
this tracker. If no
+     * message is actively being processed, activeMessageMetadata will be null.
+     */
+    private ActiveMessageMetadata activeMessageMetadata = null;

Review Comment:
   annotate with @Nullable
   if threadsafety is required, consider using 
`AtomicReference<ActiveMessageMetadata>`



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -287,6 +286,8 @@ public class StreamingDataflowWorker {
   // Possibly overridden by streaming engine config.
   private int maxWorkItemCommitBytes = Integer.MAX_VALUE;
 
+  private DataflowExecutionStateSampler sampler = 
DataflowExecutionStateSampler.instance();

Review Comment:
   can be final if not reassigned.
   
   also is `DataflowExecutionStateSampler` a singleton?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.HashMap;
+import java.util.IntSummaryStatistics;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowExecutionStateTracker;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.joda.time.DateTimeUtils.MillisProvider;
+
+public class DataflowExecutionStateSampler extends ExecutionStateSampler {
+
+  private static final MillisProvider SYSTEM_MILLIS_PROVIDER = 
System::currentTimeMillis;
+  private static final DataflowExecutionStateSampler INSTANCE =
+      new DataflowExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);
+
+  private HashMap<String, DataflowExecutionStateTracker> 
activeTrackersByWorkId = new HashMap<>();

Review Comment:
   Preferred to mark members final go/java-practices/final#fields unless they 
can be reassigned



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java:
##########
@@ -245,6 +248,16 @@ public static class DataflowExecutionStateTracker extends 
ExecutionStateTracker
     private final ContextActivationObserverRegistry 
contextActivationObserverRegistry;
     private final String workItemId;
 
+    /**
+     * Metadata on the message whose processing is currently being managed by 
this tracker. If no
+     * message is actively being processed, activeMessageMetadata will be null.
+     */
+    private ActiveMessageMetadata activeMessageMetadata = null;
+
+    private MillisProvider clock = System::currentTimeMillis;

Review Comment:
   make member variables final if not intended to be reassigned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to