This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 44c7316  [GOBBLIN-1520] Standardize FsSpec's protocol in file name
44c7316 is described below

commit 44c73168125e1132b0cf511a78bee3cfc88522d3
Author: Lei Sun <[email protected]>
AuthorDate: Fri Aug 20 14:56:27 2021 -0700

    [GOBBLIN-1520] Standardize FsSpec's protocol in file name
    
    Standardize FsSpec's protocol in file name
    
    Address reviewer's comments
    
    Remove unused imports
    
    Closes #3370 from autumnust/spec-consumer-avro
---
 .../apache/gobblin/runtime/api/FsSpecConsumer.java |  5 +++-
 .../apache/gobblin/runtime/api/FsSpecProducer.java | 12 +++++++-
 .../gobblin/runtime/api/FsSpecProducerTest.java    | 33 +++++++++++++++++++---
 .../java/org/apache/gobblin/util/AvroUtils.java    |  2 +-
 4 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 7e3d286..514338b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -44,8 +44,10 @@ import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.filters.AndPathFilter;
 import org.apache.gobblin.util.filters.HiddenFilter;
 
 
@@ -82,7 +84,8 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
     List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
     FileStatus[] fileStatuses;
     try {
-      fileStatuses = this.fs.listStatus(this.specDirPath, new HiddenFilter());
+      fileStatuses = this.fs.listStatus(this.specDirPath,
+          new AndPathFilter(new HiddenFilter(), new 
AvroUtils.AvroPathFilter()));
     } catch (IOException e) {
       log.error("Error when listing files at path: {}", 
this.specDirPath.toString(), e);
       return null;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
index c626b20..338613b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
@@ -49,6 +50,12 @@ import org.apache.gobblin.util.HadoopUtils;
 /**
  * An implementation of {@link SpecProducer} that produces {@link JobSpec}s to 
the {@value FsSpecConsumer#SPEC_PATH_KEY}
  * for consumption by the {@link FsSpecConsumer}.
+ *
+ * The pair {@link FsSpecProducer} and {@link FsSpecConsumer} assumes 
serialization format as Avro. More specifically,
+ * {@link JobSpec}s will be serialized as ".avro" file by {@link 
FsSpecProducer} and {@link FsSpecConsumer} filtered
+ * all files without proper postfix to avoid loading corrupted {@link 
JobSpec}s that could possibly existed due to
+ * ungraceful exits of the application or weak file system semantics.
+ *
  */
 @Slf4j
 public class FsSpecProducer implements SpecProducer<Spec> {
@@ -136,7 +143,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
     DatumWriter<AvroJobSpec> datumWriter = new 
SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
     DataFileWriter<AvroJobSpec> dataFileWriter = new 
DataFileWriter<>(datumWriter);
 
-    Path jobSpecPath = new Path(this.specConsumerPath, jobSpec.getUri());
+    Path jobSpecPath = new Path(this.specConsumerPath, 
annotateSpecFileName(jobSpec.getUri()));
 
     //Write the new JobSpec to a temporary path first.
     Path tmpDir = new Path(this.specConsumerPath, 
UUID.randomUUID().toString());
@@ -160,4 +167,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
     fs.delete(tmpJobSpecPath.getParent(), true);
   }
 
+  private String annotateSpecFileName(String rawName) {
+    return rawName + AvroUtils.AVRO_SUFFIX;
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
index 52dcf8b..5e43eb6 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.testng.Assert;
@@ -40,18 +41,26 @@ import org.apache.gobblin.util.ConfigUtils;
 public class FsSpecProducerTest {
   private FsSpecProducer _fsSpecProducer;
   private FsSpecConsumer _fsSpecConsumer;
+  private Config _config;
+  private File workDir;
 
   @BeforeMethod
   public void setUp()
       throws IOException {
-    File tmpDir = Files.createTempDir();
+    this.workDir = Files.createTempDir();
+    this.workDir.deleteOnExit();
     Config config = 
ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, 
ConfigValueFactory.fromAnyRef(
-        tmpDir.getAbsolutePath()));
+        this.workDir.getAbsolutePath()));
     this._fsSpecProducer = new FsSpecProducer(config);
     this._fsSpecConsumer = new FsSpecConsumer(config);
+    this._config = config;
   }
 
   private JobSpec createTestJobSpec() throws URISyntaxException {
+    return createTestJobSpec("testJob");
+  }
+
+  private JobSpec createTestJobSpec(String jobSpecUri) throws 
URISyntaxException {
     Properties properties = new Properties();
     properties.put("key1", "val1");
     properties.put("key2", "val2");
@@ -59,7 +68,7 @@ public class FsSpecProducerTest {
     properties.put("key3.1", "val3");
     properties.put("key3.1.1", "val4");
 
-    JobSpec jobSpec = JobSpec.builder("testJob")
+    JobSpec jobSpec = JobSpec.builder(jobSpecUri)
         .withConfig(ConfigUtils.propertiesToConfig(properties))
         .withVersion("1")
         .withDescription("")
@@ -69,9 +78,14 @@ public class FsSpecProducerTest {
 
   @Test
   public void testAddSpec()
-      throws URISyntaxException, ExecutionException, InterruptedException {
+      throws URISyntaxException, ExecutionException, InterruptedException, 
IOException {
     this._fsSpecProducer.addSpec(createTestJobSpec());
 
+    // Add some random files(with non-avro extension name) into the folder 
observed by consumer, they shall not be picked up.
+    File randomFile = new File(workDir, "random");
+    Assert.assertTrue(randomFile.createNewFile());
+    randomFile.deleteOnExit();
+
     List<Pair<SpecExecutor.Verb, Spec>> jobSpecs = 
this._fsSpecConsumer.changedSpecs().get();
     Assert.assertEquals(jobSpecs.size(), 1);
     Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
@@ -80,6 +94,17 @@ public class FsSpecProducerTest {
     Assert.assertEquals(((JobSpec) 
jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
     Assert.assertEquals(((JobSpec) 
jobSpecs.get(0).getRight()).getConfig().getString("key3.1" + 
ConfigUtils.STRIP_SUFFIX), "val3");
     Assert.assertEquals(((JobSpec) 
jobSpecs.get(0).getRight()).getConfig().getString("key3.1.1"), "val4");
+    jobSpecs.clear();
+
+    // If there are other jobSpec in .avro files added by testSpecProducer, 
they shall still be found.
+    this._fsSpecProducer.addSpec(createTestJobSpec("newTestJob"));
+    jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+    Assert.assertEquals(jobSpecs.size(), 2);
+    Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
+    Assert.assertEquals(jobSpecs.get(1).getLeft(), SpecExecutor.Verb.ADD);
+    List<String> uriList = jobSpecs.stream().map(s -> 
s.getRight().getUri().toString()).collect(Collectors.toList());
+    Assert.assertTrue(uriList.contains( "testJob"));
+    Assert.assertTrue(uriList.contains( "newTestJob"));
   }
 
   @Test (dependsOnMethods = "testAddSpec")
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 3d2587f..0ac1544 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -93,7 +93,7 @@ public class AvroUtils {
 
   public static final String FIELD_LOCATION_DELIMITER = ".";
 
-  private static final String AVRO_SUFFIX = ".avro";
+  public static final String AVRO_SUFFIX = ".avro";
 
   private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
 

Reply via email to