autumnust commented on a change in pull request #2956: GOBBLIN-1116: Avoid 
registering schema with schema registry during Me…
URL: https://github.com/apache/incubator-gobblin/pull/2956#discussion_r409157891
 
 

 ##########
 File path: 
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 ##########
 @@ -919,6 +935,87 @@ private void 
sendEmailOnShutdown(Optional<ApplicationReport> applicationReport)
     }
   }
 
+  private static Config addDynamicConfig(Config config) throws IOException {
+    if (isKafkaReportingEnabled(config) && 
isKafkaAvroSchemaRegistryEnabled(config)) {
+      KafkaAvroSchemaRegistry registry = new 
KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config));
+      return addMetricReportingDynamicConfig(config, registry);
+    } else {
+      return config;
+    }
+  }
+
+  /**
+   * Write the config to the file specified with the config key {@value 
GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it
+   * is configured.
+   * @param config the config to output
+   * @throws IOException
+   */
+  @VisibleForTesting
+  static void outputConfigToFile(Config config)
+      throws IOException {
+    // If a file path is specified then write the Azkaban config to that path 
in HOCON format.
+    // This can be used to generate an application.conf file to pass to the 
yarn app master and containers.
+    if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) {
+      File configFile = new 
File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH));
+      File parentDir = configFile.getParentFile();
+
+      if (parentDir != null && !parentDir.exists()) {
+        if (!parentDir.mkdirs()) {
+          throw new IOException("Error creating directories for " + parentDir);
+        }
+      }
+
+      ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults();
+      configRenderOptions = configRenderOptions.setComments(false);
+      configRenderOptions = configRenderOptions.setOriginComments(false);
+      configRenderOptions = configRenderOptions.setFormatted(true);
+      configRenderOptions = configRenderOptions.setJson(false);
+
+      String renderedConfig = config.root().render(configRenderOptions);
+
+      FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8);
+    }
+  }
+
+  /**
+   * A method that adds dynamic config related to Kafka-based metric 
reporting. In particular, if Kafka based metric
+   * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured, 
this method registers the metric reporting
+   * related schemas and adds the returned schema ids to the config to be used 
by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster}
+   * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The 
advantage of doing this is that the TaskRunners
+   * do not have to initiate a connection with the schema registry server and 
reduces the chances of metric reporter
+   * instantiation failures in the {@link 
org.apache.gobblin.cluster.GobblinTaskRunner}s.
+   * @param config
+   */
+  @VisibleForTesting
+  static Config addMetricReportingDynamicConfig(Config config, 
KafkaAvroSchemaRegistry registry) throws IOException {
+    Properties properties = ConfigUtils.configToProperties(config);
+    if (KafkaReporterUtils.isEventsEnabled(properties)) {
+      Schema schema = new Schema.Parser()
+          
.parse(GobblinYarnAppLauncher.class.getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc"));
+      String schemaId = registry.register(schema, 
KafkaReporterUtils.getEventsTopic(properties).get());
+      config = 
config.withValue(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID,
+          ConfigValueFactory.fromAnyRef(schemaId));
+    }
+
+    if (KafkaReporterUtils.isMetricsEnabled(properties)) {
+      Schema schema = new Schema.Parser()
+          
.parse(GobblinYarnAppLauncher.class.getClassLoader().getResourceAsStream("MetricReport.avsc"));
+      String schemaId = registry.register(schema, 
KafkaReporterUtils.getMetricsTopic(properties).get());
+      config = 
config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID,
+          ConfigValueFactory.fromAnyRef(schemaId));
+    }
+    return config;
+  }
+
+  private static boolean isKafkaReportingEnabled(Config config) {
 
 Review comment:
   Can this be moved into the util class and made public ? KafkaReporterFactory 
is also calling something similar

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to