rohitsinha54 commented on code in PR #33059:
URL: https://github.com/apache/beam/pull/33059#discussion_r1835282845


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java:
##########
@@ -50,9 +57,59 @@
  * example off how to query metrics.
  */
 public class Metrics {
+  private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);
 
   private Metrics() {}
 
+  static class MetricsFlag {
+    private static final AtomicReference<@Nullable MetricsFlag> INSTANCE = new 
AtomicReference<>();
+    final boolean counterDisabled;
+    final boolean stringSetDisabled;
+
+    private MetricsFlag(boolean counterDisabled, boolean stringSetDisabled) {
+      this.counterDisabled = counterDisabled;
+      this.stringSetDisabled = stringSetDisabled;
+    }
+
+    static boolean counterDisabled() {
+      MetricsFlag flag = INSTANCE.get();
+      return flag != null && flag.counterDisabled;
+    }
+
+    static boolean stringSetDisabled() {
+      MetricsFlag flag = INSTANCE.get();
+      return flag != null && flag.stringSetDisabled;
+    }
+  }
+
+  /**
+   * Initialize metrics flags if not already done so.
+   *
+   * <p>Should be called by worker at worker harness initialization. Should 
not be called by user
+   * code (and it does not have an effect as the initialization completed 
before).
+   */
+  @Internal
+  public static void setDefaultPipelineOptions(PipelineOptions options) {
+    MetricsFlag flag = MetricsFlag.INSTANCE.get();
+    if (flag == null) {
+      ExperimentalOptions exp = options.as(ExperimentalOptions.class);
+      boolean counterDisabled = ExperimentalOptions.hasExperiment(exp, 
"disableCounterMetrics");
+      if (counterDisabled) {
+        LOG.info("Counter metrics are disabled.");
+      }
+      boolean stringSetDisabled = ExperimentalOptions.hasExperiment(exp, 
"disableStringSetMetrics");

Review Comment:
   With this if a streaming customer want to disable StringSet or Lineage in 
their job they will need to restart the job passing this experiment.
   
   On service side we have an experiment to enable lineage (by default 
disabled). It is already present in pipeline option 
https://screenshot.googleplex.com/7WaPSvq85nHyCEt I was thinking we should keep 
lineage metric reporting disabled by default unless a job enable lineage. 
   
   We have identified some cases where for long running streaming jobs the 
lineage information can get very large over time so it will be ideal to only 
capture this for job which opt for lineage. 
   
   Can I please ask you while you are doing the same thing in this PR to add a
   ```
   Lineage.setDefaultPipelineOptions(options);
   ````
   in file above
   
   and this similar function in Lineage class to set based on the lineage 
experiment above `enable_lineage` and the `add` in Lineage class just return as 
no-op if lineage is not enabled 
   
   Thank you in advance. 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java:
##########
@@ -567,14 +568,17 @@ static FileSystem getFileSystemInternal(String scheme) {
    *
    * <p>Outside of workers where Beam FileSystem API is used (e.g. test 
methods, user code executed
    * during pipeline submission), consider use {@link 
#registerFileSystemsOnce} if initialize
-   * FIleSystem of supported schema is the main goal.
+   * FileSystem of supported schema is the main goal.
    */
   @Internal
   public static void setDefaultPipelineOptions(PipelineOptions options) {
     checkNotNull(options, "options cannot be null");
     long id = options.getOptionsId();
     int nextRevision = options.revision();
 
+    // entry to set other PipelineOption determined flags
+    Metrics.setDefaultPipelineOptions(options);

Review Comment:
   This makes this a little overloaded no? seems odd we set metric stuff in FS. 
But I see you want to do this in centralized place. 
   Would it better to have it separate in DataflowBatchWorkerHarness.java, 
StreamingDataflowWorker.java and FnHarness.java from where all this is called? 
   
   (I do not have a preference, just a though. Feel free to ignore this comment 
if you want to do it this way)
    



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java:
##########
@@ -174,6 +231,9 @@ private DelegatingStringSet(MetricName name) {
 
     @Override
     public void add(String value) {
+      if (MetricsFlag.stringSetDisabled()) {

Review Comment:
   Should this part have unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to