This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b373031  [GOBBLIN-731] Make deserialization of FlowSpec more robust
b373031 is described below

commit b37303182212d96d42b336869ffe5b0f002c6595
Author: autumnust <[email protected]>
AuthorDate: Tue Apr 16 16:20:26 2019 -0700

    [GOBBLIN-731] Make deserialization of FlowSpec more robust
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [ ] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/731
    ) issues and references them in the PR title.
    
    ### Description
    
    Catching exception when loading `FlowSpec` from
    FileSystem and make sure that even some of spec
    files are problematic, the rest of them will still
    be successfully deserialized.
    
    Adding unit test for that.
    
    Removing some unnecessary modifiers while reading
    through the code.
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Make deserializatin of spec more robust
    
    Address comments on adding serDeException in test
    
    Remove mistakenly brought imports
    
    Closes #2598 from autumnust/flowSpecSerDeRobust
---
 .../instrumented/StandardMetricsBridge.java        |  2 +-
 .../gobblin/runtime/api/MutableSpecCatalog.java    |  2 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 17 ++--
 .../gobblin/runtime/spec_store/FSSpecStore.java    | 18 +++-
 .../gobblin/spec_catalog/FlowCatalogTest.java      | 96 ++++++++++++++++++----
 5 files changed, 108 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index b371fe1..5e5f4f9 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -40,7 +40,7 @@ public interface StandardMetricsBridge {
     return ImmutableList.of();
   }
 
-  public class StandardMetrics implements MetricSet {
+  class StandardMetrics implements MetricSet {
     protected final List<ContextAwareMetric> contextAwareMetrics;
     protected final Map<String, Metric> rawMetrics;
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 2ddbf7e..b64fb17 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -57,7 +57,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
   void remove(URI uri, Properties headers) throws SpecNotFoundException;
 
   @Slf4j
-  public static class MutableStandardMetrics extends StandardMetrics {
+  class MutableStandardMetrics extends StandardMetrics {
     public static final String TIME_FOR_SPEC_CATALOG_REMOVE = 
"timeForSpecCatalogRemove";
     public static final String TIME_FOR_SPEC_CATALOG_PUT = 
"timeForSpecCatalogPut";
     @Getter private final ContextAwareTimer timeForSpecCatalogPut;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index a7db4fd..ddf8cbe 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -205,24 +205,25 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
   /* Catalog core functionality                     *
   /**************************************************/
 
+  /**
+   * Get all specs from {@link SpecStore}
+   */
   @Override
   public Collection<Spec> getSpecs() {
     try {
       return specStore.getSpecs();
+      // TODO: Have kind of metrics keeping track of specs that failed to be 
deserialized.
+
     } catch (IOException e) {
       throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
     }
   }
 
   public Collection<Spec> getSpecsWithTimeUpdate() {
-    try {
-      long startTime = System.currentTimeMillis();
-      Collection<Spec> specs = specStore.getSpecs();
-      this.metrics.updateGetSpecTime(startTime);
-      return specs;
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
-    }
+    long startTime = System.currentTimeMillis();
+    Collection<Spec> specs = this.getSpecs();
+    this.metrics.updateGetSpecTime(startTime);
+    return specs;
   }
 
   public boolean exists(URI uri) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index d87d6d4..0e2c482 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -238,13 +238,27 @@ public class FSSpecStore implements SpecStore {
     return specs;
   }
 
-  private void getSpecs(Path directory, Collection<Spec> specs) throws 
IOException {
+  /**
+   * For multiple {@link FlowSpec}s to be loaded, catch Exceptions when one of 
them failed to be loaded and
+   * continue with the rest.
+   *
+   * The {@link IOException} thrown from standard FileSystem call will be 
propagated, while the file-specific
+   * exception will be caught to ensure other files being able to deserialized.
+   *
+   * @param directory The directory that contains specs to be deserialized
+   * @param specs Container of specs.
+   */
+  private void getSpecs(Path directory, Collection<Spec> specs) throws 
Exception {
     FileStatus[] fileStatuses = fs.listStatus(directory);
     for (FileStatus fileStatus : fileStatuses) {
       if (fileStatus.isDirectory()) {
         getSpecs(fileStatus.getPath(), specs);
       } else {
-        specs.add(readSpecFromFile(fileStatus.getPath()));
+        try {
+          specs.add(readSpecFromFile(fileStatus.getPath()));
+        } catch (Exception e) {
+          log.warn(String.format("Path[%s] cannot be correctly deserialized as 
Spec", fileStatus.getPath()), e);
+        }
       }
     }
   }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index 537cdbe..2d73290 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -17,15 +17,37 @@
 
 package org.apache.gobblin.spec_catalog;
 
+import com.google.common.base.Optional;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -33,20 +55,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
 
 public class FlowCatalogTest {
   private static final Logger logger = 
LoggerFactory.getLogger(FlowCatalog.class);
@@ -117,6 +125,64 @@ public class FlowCatalogTest {
     }
   }
 
+  /**
+   * Make sure that when there's on spec failed to be deserialized, the rest 
of spec in specStore can
+   * still be taken care of.
+   */
+  @Test
+  public void testGetSpecRobustness() throws Exception {
+
+    File specDir = Files.createTempDir();
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, 
specDir.getAbsolutePath());
+    SpecSerDe serde = Mockito.mock(SpecSerDe.class);
+    TestFsSpecStore fsSpecStore = new 
TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
+
+    // Version is specified as 0,1,2
+    File specFileFail = new File(specDir, "spec_fail");
+    Assert.assertTrue(specFileFail.createNewFile());
+    File specFile1 = new File(specDir, "spec0");
+    Assert.assertTrue(specFile1.createNewFile());
+    File specFile2 = new File(specDir, "spec1");
+    Assert.assertTrue(specFile2.createNewFile());
+    File specFile3 = new File(specDir, "serDeFail");
+    Assert.assertTrue(specFile3.createNewFile());
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Assert.assertEquals(fs.getFileStatus(new 
Path(specFile3.getAbsolutePath())).getLen(), 0);
+
+    Collection<Spec> specList = fsSpecStore.getSpecs();
+    // The fail and serDe datasets wouldn't survive
+    Assert.assertEquals(specList.size(), 2);
+    for (Spec spec: specList) {
+      Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
+      Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
+    }
+  }
+
+  class TestFsSpecStore extends FSSpecStore {
+    public TestFsSpecStore(Config sysConfig, SpecSerDe specSerDe) throws 
IOException {
+      super(sysConfig, specSerDe);
+    }
+
+    @Override
+    protected Spec readSpecFromFile(Path path) throws IOException {
+      if (path.getName().contains("fail")) {
+        throw new IOException("Mean to fail in the test");
+      } else if (path.getName().contains("serDeFail")) {
+
+        // Simulate the way that a serDe exception
+        FSDataInputStream fis = fs.open(path);
+        SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
+
+        // This line should never be reached since we generate SerDe Exception 
on purpose.
+        Assert.assertTrue(false);
+        return null;
+      }
+      else return initFlowSpec();
+    }
+  }
+
   @Test
   public void createFlowSpec() {
     // List Current Specs

Reply via email to