jack-moseley commented on a change in pull request #2620: [GOBBLIN-756] Add 
flow catalog that updates when filesystem is modified
URL: https://github.com/apache/incubator-gobblin/pull/2620#discussion_r279477154
 
 

 ##########
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatingFSFlowTemplateCatalog.java
 ##########
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
+
+
+/**
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates. 
It has a
+ * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} on the 
root path, and clears the cache when a change
+ * is detected so that the next call to {@link #getFlowTemplate(URI)} will use 
the updated files.
+ */
+@Slf4j
+public class UpdatingFSFlowTemplateCatalog extends FSFlowTemplateCatalog {
+  private static Map<URI, FlowTemplate> flowTemplateMap = new 
ConcurrentHashMap<>();
+  private static Map<URI, List<JobTemplate>> jobTemplateMap = new 
ConcurrentHashMap<>();
+
+  public UpdatingFSFlowTemplateCatalog(Config sysConfig) throws IOException {
+    super(sysConfig);
+  }
+
+  @Override
+  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+      throws SpecNotFoundException, JobTemplate.TemplateException, 
IOException, URISyntaxException {
+    if (!flowTemplateMap.containsKey(flowTemplateDirURI)) {
+      flowTemplateMap.put(flowTemplateDirURI, 
super.getFlowTemplate(flowTemplateDirURI));
+    }
+    return flowTemplateMap.get(flowTemplateDirURI);
+  }
+
+  @Override
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+    if (!jobTemplateMap.containsKey(flowTemplateDirURI)) {
+      jobTemplateMap.put(flowTemplateDirURI, 
super.getJobTemplatesForFlow(flowTemplateDirURI));
+    }
+    return jobTemplateMap.get(flowTemplateDirURI);
+  }
+
+  @Override
+  protected PathAlterationListener getListener() {
+    return new FlowCatalogPathAlterationListener();
+  }
+
+  @Override
+  protected void startUp() throws IOException {
+    if (this.pathAlterationDetector != null) {
+      this.pathAlterationDetector.start();
+    }
+  }
+
+  /**
+   * Clear cached templates so they will be reloaded next time {@link 
#getFlowTemplate(URI)} is called.
+   */
+  public void clearTemplates() {
+    log.info("Change detected, reloading flow templates.");
+    flowTemplateMap.clear();
+    jobTemplateMap.clear();
+  }
+
+  /**
+   * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that 
clears job template cache if a file is
+   * created or updated.
+   */
+  private class FlowCatalogPathAlterationListener extends 
PathAlterationListenerAdaptor {
+    @Override
+    public void onFileCreate(Path path) {
+      clearTemplates();
 
 Review comment:
   In most cases we wouldn't need to clear when file is added, but I think 
there could be edge cases where it would be problem. Let's say we have a flow 
template which points to a job template that does not exist. Now if we create 
that job template (without modifying anything else, we need to reload the flow 
template. Another case would be deleting and recreating a changed template file 
instead of modifying it.
   
   Pretty small edge cases but it is not very expensive to reload all templates 
so I think it's better to leave it like this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to