[ 
https://issues.apache.org/jira/browse/GOBBLIN-1116?focusedWorklogId=423099&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-423099
 ]

ASF GitHub Bot logged work on GOBBLIN-1116:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Apr/20 22:01
            Start Date: 15/Apr/20 22:01
    Worklog Time Spent: 10m 
      Work Description: autumnust commented on pull request #2956: 
GOBBLIN-1116: Avoid registering schema with schema registry during Me…
URL: https://github.com/apache/incubator-gobblin/pull/2956#discussion_r409157277
 
 

 ##########
 File path: 
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 ##########
 @@ -919,6 +935,87 @@ private void 
sendEmailOnShutdown(Optional<ApplicationReport> applicationReport)
     }
   }
 
+  private static Config addDynamicConfig(Config config) throws IOException {
+    if (isKafkaReportingEnabled(config) && 
isKafkaAvroSchemaRegistryEnabled(config)) {
+      KafkaAvroSchemaRegistry registry = new 
KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config));
+      return addMetricReportingDynamicConfig(config, registry);
+    } else {
+      return config;
+    }
+  }
+
+  /**
+   * Write the config to the file specified with the config key {@value 
GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it
+   * is configured.
+   * @param config the config to output
+   * @throws IOException
+   */
+  @VisibleForTesting
+  static void outputConfigToFile(Config config)
+      throws IOException {
+    // If a file path is specified then write the Azkaban config to that path 
in HOCON format.
+    // This can be used to generate an application.conf file to pass to the 
yarn app master and containers.
+    if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) {
+      File configFile = new 
File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH));
+      File parentDir = configFile.getParentFile();
+
+      if (parentDir != null && !parentDir.exists()) {
+        if (!parentDir.mkdirs()) {
+          throw new IOException("Error creating directories for " + parentDir);
+        }
+      }
+
+      ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
+      configRenderOptions = configRenderOptions.setComments(false);
+      configRenderOptions = configRenderOptions.setOriginComments(false);
+      configRenderOptions = configRenderOptions.setFormatted(true);
+      configRenderOptions = configRenderOptions.setJson(false);
+
+      String renderedConfig = config.root().render(configRenderOptions);
+
+      FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
+    }
+  }
+
+  /**
+   * A method that adds dynamic config related to Kafka-based metric 
reporting. In particular, if Kafka based metric
+   * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured, 
this method registers the metric reporting
+   * related schemas and adds the returned schema ids to the config to be used 
by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster}
+   * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The 
advantage of doing this is that the TaskRunners
+   * do not have to initiate a connection with the schema registry server and 
reduces the chances of metric reporter
+   * instantiation failures in the {@link 
org.apache.gobblin.cluster.GobblinTaskRunner}s.
+   * @param config
+   */
+  @VisibleForTesting
+  static Config addMetricReportingDynamicConfig(Config config, 
KafkaAvroSchemaRegistry registry) throws IOException {
+    Properties properties = ConfigUtils.configToProperties(config);
+    if (KafkaReporterUtils.isEventsEnabled(properties)) {
+      Schema schema = new Schema.Parser()
 
 Review comment:
   This code block can be reused with the next one. But I am fine if you find 
current way more readable. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 423099)
    Time Spent: 1h  (was: 50m)

> Avoid registering schema with schema registry during MetricReporting 
> initialization from cluster workers
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1116
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1116
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-cluster
>    Affects Versions: 0.15.0
>            Reporter: Sudarshan Vasudevan
>            Assignee: Hung Tran
>            Priority: Major
>             Fix For: 0.15.0
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> This PR avoids having to register schema with schema registry on Kafka metric 
> reporter initialization from cluster workers. This avoids a potentially large 
> number of connections that may be initiated with Kafka Schema registry on 
> Gobblin on Yarn application launch. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to