Copilot commented on code in PR #9542: URL: https://github.com/apache/gravitino/pull/9542#discussion_r2646782666
########## core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java: ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.job; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URLClassLoader; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.CreateMetalakeEvent; +import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.JobTemplateEntity; +import org.apache.gravitino.metalake.MetalakeManager; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.apache.gravitino.utils.NamespaceUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event listener that automatically registers built-in job templates when metalakes are created. + * + * <p>This listener monitors metalake creation events and registers all discovered built-in job + * templates (via JobTemplateProvider SPI) into the newly created metalake. It also handles + * registration for existing metalakes on first startup. + */ +public class BuiltInJobTemplateEventListener implements EventListenerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(BuiltInJobTemplateEventListener.class); + + private static final Pattern BUILTIN_JOB_TEMPLATE_NAME_PATTERN = + Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN); + private static final Pattern VERSION_PATTERN = + Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN); + + private final JobManager jobManager; + private final EntityStore entityStore; + private final IdGenerator idGenerator; + + public BuiltInJobTemplateEventListener( + JobManager jobManager, EntityStore entityStore, IdGenerator idGenerator) { + this.jobManager = jobManager; + this.entityStore = entityStore; + this.idGenerator = idGenerator; + } + + @Override + public void init(Map<String, String> properties) throws RuntimeException { + // Dependencies will be set via setDependencies() method + // This is called from EventListenerManager before start() + } + + @Override + public void start() throws RuntimeException { + // Register built-in job templates for all existing metalakes on first startup + try { + List<String> existingMetalakes = MetalakeManager.listInUseMetalakes(entityStore); + if (existingMetalakes.isEmpty()) { + return; + } + + LOG.info( + "Registering built-in job templates for {} existing metalakes", existingMetalakes.size()); + + Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates(); + if (builtInTemplates.isEmpty()) { + LOG.info("No built-in job templates discovered via JobTemplateProvider"); + return; + } + + existingMetalakes.forEach( + metalake -> { + try { + reconcileBuiltInJobTemplates(metalake, builtInTemplates); + } catch (Exception e) { + LOG.error("Failed to register built-in job templates for metalake: {}", metalake, e); + } + }); + + } catch (Exception e) { + LOG.error("Failed to register built-in job templates for existing metalakes", e); + } + } + + @Override + public void stop() throws RuntimeException { + // No resources to clean up + } + + @Override + public void onPostEvent(Event postEvent) throws RuntimeException { + if (postEvent instanceof CreateMetalakeEvent) { + CreateMetalakeEvent event = (CreateMetalakeEvent) postEvent; + String metalakeName = event.identifier().name(); + + try { + Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates(); + if (builtInTemplates.isEmpty()) { + LOG.debug("No built-in job templates to register for metalake: {}", metalakeName); + return; + } + + reconcileBuiltInJobTemplates(metalakeName, builtInTemplates); + LOG.info("Registered built-in job templates for metalake: {}", metalakeName); + } catch (Exception e) { + LOG.error("Failed to register built-in job templates for metalake: {}", metalakeName, e); + } + } + } + + @Override + public Mode mode() { + // Use async isolated to avoid blocking metalake creation + return Mode.ASYNC_ISOLATED; + } + + @VisibleForTesting + Map<String, JobTemplate> loadBuiltInJobTemplates() { + Map<String, JobTemplate> builtInTemplates = Maps.newHashMap(); + + // Load from auxlib directory if available + ClassLoader auxlibClassLoader = createAuxlibClassLoader(); + ServiceLoader<JobTemplateProvider> loader = + ServiceLoader.load(JobTemplateProvider.class, auxlibClassLoader); + + loader.forEach( + provider -> + provider + .jobTemplates() + .forEach( + template -> { + if (!isValidBuiltInJobTemplate(template)) { + LOG.warn("Skip invalid built-in job template {}", template.name()); + return; + } + + JobTemplate existing = builtInTemplates.get(template.name()); + int newVersion = version(template.customFields()); + int existingVersion = + Optional.ofNullable(existing) + .map(jt -> version(jt.customFields())) + .orElse(0); + if (existing == null || newVersion > existingVersion) { + builtInTemplates.put(template.name(), template); + } + })); + return builtInTemplates; + } + + private ClassLoader createAuxlibClassLoader() { + String gravitinoHome = System.getenv("GRAVITINO_HOME"); + if (gravitinoHome == null) { + LOG.warn("GRAVITINO_HOME not set, using current classloader for built-in job templates"); + return Thread.currentThread().getContextClassLoader(); + } + + File auxlibDir = new File(gravitinoHome, "auxlib"); + if (!auxlibDir.exists() || !auxlibDir.isDirectory()) { + LOG.warn( + "Auxlib directory {} does not exist, using current classloader for built-in job templates", + auxlibDir.getAbsolutePath()); + return Thread.currentThread().getContextClassLoader(); + } + + // Only load gravitino-jobs-*.jar files + File[] jarFiles = + auxlibDir.listFiles( + (dir, name) -> name.startsWith("gravitino-jobs-") && name.endsWith(".jar")); + if (jarFiles == null || jarFiles.length == 0) { + LOG.info( + "No gravitino-jobs JAR files found in auxlib directory {}", auxlibDir.getAbsolutePath()); + return Thread.currentThread().getContextClassLoader(); + } + + try { + URI[] jarUris = Arrays.stream(jarFiles).map(File::toURI).toArray(URI[]::new); + LOG.info( + "Loading built-in job templates from {} gravitino-jobs JAR file(s) in auxlib directory", + jarUris.length); + return new URLClassLoader( + Arrays.stream(jarUris) + .map( + uri -> { + try { + return uri.toURL(); + } catch (MalformedURLException e) { + throw new RuntimeException("Failed to convert URI to URL: " + uri, e); + } + }) + .toArray(java.net.URL[]::new), + Thread.currentThread().getContextClassLoader()); Review Comment: The URLClassLoader created here is never closed, which can lead to resource leaks. URLClassLoaders should be closed when no longer needed to release file handles and prevent memory leaks. Consider storing the classloader as an instance variable and closing it in the stop() method, or use a try-with-resources block if the classloader is only needed temporarily. ########## core/src/main/java/org/apache/gravitino/job/BuiltInJobTemplateEventListener.java: ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.job; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URLClassLoader; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Entity; +import org.apache.gravitino.EntityStore; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.JobTemplateAlreadyExistsException; +import org.apache.gravitino.exceptions.NoSuchEntityException; +import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.CreateMetalakeEvent; +import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.lock.LockType; +import org.apache.gravitino.lock.TreeLockUtils; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.JobTemplateEntity; +import org.apache.gravitino.metalake.MetalakeManager; +import org.apache.gravitino.storage.IdGenerator; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.apache.gravitino.utils.NamespaceUtil; +import org.apache.gravitino.utils.PrincipalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event listener that automatically registers built-in job templates when metalakes are created. + * + * <p>This listener monitors metalake creation events and registers all discovered built-in job + * templates (via JobTemplateProvider SPI) into the newly created metalake. It also handles + * registration for existing metalakes on first startup. + */ +public class BuiltInJobTemplateEventListener implements EventListenerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(BuiltInJobTemplateEventListener.class); + + private static final Pattern BUILTIN_JOB_TEMPLATE_NAME_PATTERN = + Pattern.compile(JobTemplateProvider.BUILTIN_NAME_PATTERN); + private static final Pattern VERSION_PATTERN = + Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN); + + private final JobManager jobManager; + private final EntityStore entityStore; + private final IdGenerator idGenerator; + + public BuiltInJobTemplateEventListener( + JobManager jobManager, EntityStore entityStore, IdGenerator idGenerator) { + this.jobManager = jobManager; + this.entityStore = entityStore; + this.idGenerator = idGenerator; + } + + @Override + public void init(Map<String, String> properties) throws RuntimeException { + // Dependencies will be set via setDependencies() method + // This is called from EventListenerManager before start() + } + + @Override + public void start() throws RuntimeException { + // Register built-in job templates for all existing metalakes on first startup + try { + List<String> existingMetalakes = MetalakeManager.listInUseMetalakes(entityStore); + if (existingMetalakes.isEmpty()) { + return; + } + + LOG.info( + "Registering built-in job templates for {} existing metalakes", existingMetalakes.size()); + + Map<String, JobTemplate> builtInTemplates = loadBuiltInJobTemplates(); + if (builtInTemplates.isEmpty()) { + LOG.info("No built-in job templates discovered via JobTemplateProvider"); + return; + } + + existingMetalakes.forEach( + metalake -> { + try { + reconcileBuiltInJobTemplates(metalake, builtInTemplates); + } catch (Exception e) { + LOG.error("Failed to register built-in job templates for metalake: {}", metalake, e); + } + }); + + } catch (Exception e) { + LOG.error("Failed to register built-in job templates for existing metalakes", e); + } + } + + @Override + public void stop() throws RuntimeException { + // No resources to clean up Review Comment: The stop() method does not close the URLClassLoader that may have been created in createAuxlibClassLoader(). If the classloader was created and cached, it should be closed here to prevent resource leaks. Consider storing the classloader as an instance variable and closing it in stop(). ```suggestion if (auxlibClassLoader != null) { try { auxlibClassLoader.close(); } catch (IOException e) { LOG.warn("Failed to close auxiliary job template classloader", e); } finally { auxlibClassLoader = null; } } ``` ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.spark; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +/** + * Self-contained Spark Pi program for the built-in SparkPi job template. + * + * <p>This avoids depending on the Spark examples jar so the built-in template can run with only the + * Gravitino-provided jobs artifact on the classpath. + */ +public class SparkPiJob implements BuiltInJob { + + private static final String NAME = JobTemplateProvider.BUILTIN_NAME_PREFIX + "sparkpi"; + // Bump VERSION whenever SparkPi template behavior changes (name/executable/class/args/configs). + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in SparkPi job template") + .withExecutable(resolveExecutable()) + .withClassName(SparkPiJob.class.getName()) + .withArguments(Collections.singletonList("{{slices}}")) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + public static void main(String[] args) { + int slices = args.length > 0 ? Integer.parseInt(args[0]) : 2; + int samples = Math.max(slices, 1) * 100000; + + SparkSession spark = + SparkSession.builder() + .appName("Gravitino Built-in SparkPi") + // Rely on external cluster/master configuration + .getOrCreate(); + + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD<Integer> rdd = + jsc.parallelize(IntStream.range(0, samples).boxed().collect(Collectors.toList()), slices); + long count = + rdd.filter( + i -> { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return x * x + y * y <= 1; + }) + .count(); + + double pi = 4.0 * count / samples; + System.out.printf("Pi is roughly %.5f%n", pi); Review Comment: Using System.out.printf in production code violates Apache Gravitino coding guidelines which require proper logging instead of System.out.println or System.out.printf. This output should use a logger (e.g., LOG.info) to ensure consistent logging practices and proper log level control. ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJob.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.gravitino.job.JobTemplate; + +/** Contract for built-in jobs to expose their job template definitions. */ +public interface BuiltInJob { + + /** Returns the built-in job template provided by this job. */ + JobTemplate jobTemplate(); + + /** Resolve the executable jar that hosts the built-in jobs. */ + default String resolveExecutable() { + try { + Path path = + Paths.get(BuiltInJob.class.getProtectionDomain().getCodeSource().getLocation().toURI()); Review Comment: The method comment says "Resolve the executable jar" but the implementation returns the location of the BuiltInJob interface class, not necessarily the implementing class. If the interface and implementation are in different JARs or classloaders, this could return the wrong path. Consider resolving based on the actual implementation class: this.getClass().getProtectionDomain().getCodeSource().getLocation() ```suggestion java.security.CodeSource codeSource = this.getClass().getProtectionDomain().getCodeSource(); if (codeSource == null || codeSource.getLocation() == null) { throw new RuntimeException("Failed to resolve built-in jobs jar location: no code source"); } Path path = Paths.get(codeSource.getLocation().toURI()); ``` ########## maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/spark/SparkPiJob.java: ########## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.spark; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +/** + * Self-contained Spark Pi program for the built-in SparkPi job template. + * + * <p>This avoids depending on the Spark examples jar so the built-in template can run with only the + * Gravitino-provided jobs artifact on the classpath. + */ +public class SparkPiJob implements BuiltInJob { + + private static final String NAME = JobTemplateProvider.BUILTIN_NAME_PREFIX + "sparkpi"; + // Bump VERSION whenever SparkPi template behavior changes (name/executable/class/args/configs). + private static final String VERSION = "v1"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment("Built-in SparkPi job template") + .withExecutable(resolveExecutable()) + .withClassName(SparkPiJob.class.getName()) + .withArguments(Collections.singletonList("{{slices}}")) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + public static void main(String[] args) { + int slices = args.length > 0 ? Integer.parseInt(args[0]) : 2; Review Comment: The parseInt call is not protected against NumberFormatException. If the user provides a non-numeric argument, this will crash the job with an uncaught exception. Consider wrapping this in a try-catch block and providing a meaningful error message, or validating the input before parsing. ```suggestion int slices = 2; if (args.length > 0) { try { slices = Integer.parseInt(args[0]); } catch (NumberFormatException e) { System.err.printf( "Invalid slices argument '%s'. Using default value %d.%n", args[0], slices); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
