autumnust commented on a change in pull request #2956: GOBBLIN-1116: Avoid registering schema with schema registry during Me… URL: https://github.com/apache/incubator-gobblin/pull/2956#discussion_r409157891
########## File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java ########## @@ -919,6 +935,87 @@ private void sendEmailOnShutdown(Optional<ApplicationReport> applicationReport) } } + private static Config addDynamicConfig(Config config) throws IOException { + if (isKafkaReportingEnabled(config) && isKafkaAvroSchemaRegistryEnabled(config)) { + KafkaAvroSchemaRegistry registry = new KafkaAvroSchemaRegistry(ConfigUtils.configToProperties(config)); + return addMetricReportingDynamicConfig(config, registry); + } else { + return config; + } + } + + /** + * Write the config to the file specified with the config key {@value GOBBLIN_YARN_CONFIG_OUTPUT_PATH} if it + * is configured. + * @param config the config to output + * @throws IOException + */ + @VisibleForTesting + static void outputConfigToFile(Config config) + throws IOException { + // If a file path is specified then write the Azkaban config to that path in HOCON format. + // This can be used to generate an application.conf file to pass to the yarn app master and containers. + if (config.hasPath(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)) { + File configFile = new File(config.getString(GOBBLIN_YARN_CONFIG_OUTPUT_PATH)); + File parentDir = configFile.getParentFile(); + + if (parentDir != null && !parentDir.exists()) { + if (!parentDir.mkdirs()) { + throw new IOException("Error creating directories for " + parentDir); + } + } + + ConfigRenderOptions configRenderOptions = ConfigRenderOptions.defaults(); + configRenderOptions = configRenderOptions.setComments(false); + configRenderOptions = configRenderOptions.setOriginComments(false); + configRenderOptions = configRenderOptions.setFormatted(true); + configRenderOptions = configRenderOptions.setJson(false); + + String renderedConfig = config.root().render(configRenderOptions); + + FileUtils.writeStringToFile(configFile, renderedConfig, Charsets.UTF_8); + } + } + + /** + * A method that adds dynamic config related to Kafka-based metric reporting. In particular, if Kafka based metric + * reporting is enabled and {@link KafkaAvroSchemaRegistry} is configured, this method registers the metric reporting + * related schemas and adds the returned schema ids to the config to be used by metric reporters in {@link org.apache.gobblin.yarn.GobblinApplicationMaster} + * and the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. The advantage of doing this is that the TaskRunners + * do not have to initiate a connection with the schema registry server and reduces the chances of metric reporter + * instantiation failures in the {@link org.apache.gobblin.cluster.GobblinTaskRunner}s. + * @param config + */ + @VisibleForTesting + static Config addMetricReportingDynamicConfig(Config config, KafkaAvroSchemaRegistry registry) throws IOException { + Properties properties = ConfigUtils.configToProperties(config); + if (KafkaReporterUtils.isEventsEnabled(properties)) { + Schema schema = new Schema.Parser() + .parse(GobblinYarnAppLauncher.class.getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")); + String schemaId = registry.register(schema, KafkaReporterUtils.getEventsTopic(properties).get()); + config = config.withValue(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKA_AVRO_SCHEMA_ID, + ConfigValueFactory.fromAnyRef(schemaId)); + } + + if (KafkaReporterUtils.isMetricsEnabled(properties)) { + Schema schema = new Schema.Parser() + .parse(GobblinYarnAppLauncher.class.getClassLoader().getResourceAsStream("MetricReport.avsc")); + String schemaId = registry.register(schema, KafkaReporterUtils.getMetricsTopic(properties).get()); + config = config.withValue(ConfigurationKeys.METRICS_REPORTING_METRICS_KAFKA_AVRO_SCHEMA_ID, + ConfigValueFactory.fromAnyRef(schemaId)); + } + return config; + } + + private static boolean isKafkaReportingEnabled(Config config) { Review comment: Can this be moved into the util class and made public ? KafkaReporterFactory is also calling something similar ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services