This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 44c7316 [GOBBLIN-1520] Standardize FsSpec's protocol in file name
44c7316 is described below
commit 44c73168125e1132b0cf511a78bee3cfc88522d3
Author: Lei Sun <[email protected]>
AuthorDate: Fri Aug 20 14:56:27 2021 -0700
[GOBBLIN-1520] Standardize FsSpec's protocol in file name
Standardize FsSpec's protocol in file name
Address reviewer's comments
Remove unused imports
Closes #3370 from autumnust/spec-consumer-avro
---
.../apache/gobblin/runtime/api/FsSpecConsumer.java | 5 +++-
.../apache/gobblin/runtime/api/FsSpecProducer.java | 12 +++++++-
.../gobblin/runtime/api/FsSpecProducerTest.java | 33 +++++++++++++++++++---
.../java/org/apache/gobblin/util/AvroUtils.java | 2 +-
4 files changed, 45 insertions(+), 7 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 7e3d286..514338b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -44,8 +44,10 @@ import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.filters.AndPathFilter;
import org.apache.gobblin.util.filters.HiddenFilter;
@@ -82,7 +84,8 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
List<Pair<SpecExecutor.Verb, Spec>> specList = new ArrayList<>();
FileStatus[] fileStatuses;
try {
- fileStatuses = this.fs.listStatus(this.specDirPath, new HiddenFilter());
+ fileStatuses = this.fs.listStatus(this.specDirPath,
+ new AndPathFilter(new HiddenFilter(), new
AvroUtils.AvroPathFilter()));
} catch (IOException e) {
log.error("Error when listing files at path: {}",
this.specDirPath.toString(), e);
return null;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
index c626b20..338613b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
@@ -49,6 +50,12 @@ import org.apache.gobblin.util.HadoopUtils;
/**
* An implementation of {@link SpecProducer} that produces {@link JobSpec}s to
the {@value FsSpecConsumer#SPEC_PATH_KEY}
* for consumption by the {@link FsSpecConsumer}.
+ *
+ * The pair {@link FsSpecProducer} and {@link FsSpecConsumer} assumes
serialization format as Avro. More specifically,
+ * {@link JobSpec}s will be serialized as ".avro" file by {@link
FsSpecProducer} and {@link FsSpecConsumer} filtered
+ * all files without proper postfix to avoid loading corrupted {@link
JobSpec}s that could possibly existed due to
+ * ungraceful exits of the application or weak file system semantics.
+ *
*/
@Slf4j
public class FsSpecProducer implements SpecProducer<Spec> {
@@ -136,7 +143,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
DatumWriter<AvroJobSpec> datumWriter = new
SpecificDatumWriter<>(AvroJobSpec.SCHEMA$);
DataFileWriter<AvroJobSpec> dataFileWriter = new
DataFileWriter<>(datumWriter);
- Path jobSpecPath = new Path(this.specConsumerPath, jobSpec.getUri());
+ Path jobSpecPath = new Path(this.specConsumerPath,
annotateSpecFileName(jobSpec.getUri()));
//Write the new JobSpec to a temporary path first.
Path tmpDir = new Path(this.specConsumerPath,
UUID.randomUUID().toString());
@@ -160,4 +167,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
fs.delete(tmpJobSpecPath.getParent(), true);
}
+ private String annotateSpecFileName(String rawName) {
+ return rawName + AvroUtils.AVRO_SUFFIX;
+ }
}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
index 52dcf8b..5e43eb6 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FsSpecProducerTest.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
@@ -40,18 +41,26 @@ import org.apache.gobblin.util.ConfigUtils;
public class FsSpecProducerTest {
private FsSpecProducer _fsSpecProducer;
private FsSpecConsumer _fsSpecConsumer;
+ private Config _config;
+ private File workDir;
@BeforeMethod
public void setUp()
throws IOException {
- File tmpDir = Files.createTempDir();
+ this.workDir = Files.createTempDir();
+ this.workDir.deleteOnExit();
Config config =
ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY,
ConfigValueFactory.fromAnyRef(
- tmpDir.getAbsolutePath()));
+ this.workDir.getAbsolutePath()));
this._fsSpecProducer = new FsSpecProducer(config);
this._fsSpecConsumer = new FsSpecConsumer(config);
+ this._config = config;
}
private JobSpec createTestJobSpec() throws URISyntaxException {
+ return createTestJobSpec("testJob");
+ }
+
+ private JobSpec createTestJobSpec(String jobSpecUri) throws
URISyntaxException {
Properties properties = new Properties();
properties.put("key1", "val1");
properties.put("key2", "val2");
@@ -59,7 +68,7 @@ public class FsSpecProducerTest {
properties.put("key3.1", "val3");
properties.put("key3.1.1", "val4");
- JobSpec jobSpec = JobSpec.builder("testJob")
+ JobSpec jobSpec = JobSpec.builder(jobSpecUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
.withVersion("1")
.withDescription("")
@@ -69,9 +78,14 @@ public class FsSpecProducerTest {
@Test
public void testAddSpec()
- throws URISyntaxException, ExecutionException, InterruptedException {
+ throws URISyntaxException, ExecutionException, InterruptedException,
IOException {
this._fsSpecProducer.addSpec(createTestJobSpec());
+ // Add some random files(with non-avro extension name) into the folder
observed by consumer, they shall not be picked up.
+ File randomFile = new File(workDir, "random");
+ Assert.assertTrue(randomFile.createNewFile());
+ randomFile.deleteOnExit();
+
List<Pair<SpecExecutor.Verb, Spec>> jobSpecs =
this._fsSpecConsumer.changedSpecs().get();
Assert.assertEquals(jobSpecs.size(), 1);
Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
@@ -80,6 +94,17 @@ public class FsSpecProducerTest {
Assert.assertEquals(((JobSpec)
jobSpecs.get(0).getRight()).getConfig().getString("key2"), "val2");
Assert.assertEquals(((JobSpec)
jobSpecs.get(0).getRight()).getConfig().getString("key3.1" +
ConfigUtils.STRIP_SUFFIX), "val3");
Assert.assertEquals(((JobSpec)
jobSpecs.get(0).getRight()).getConfig().getString("key3.1.1"), "val4");
+ jobSpecs.clear();
+
+ // If there are other jobSpec in .avro files added by testSpecProducer,
they shall still be found.
+ this._fsSpecProducer.addSpec(createTestJobSpec("newTestJob"));
+ jobSpecs = this._fsSpecConsumer.changedSpecs().get();
+ Assert.assertEquals(jobSpecs.size(), 2);
+ Assert.assertEquals(jobSpecs.get(0).getLeft(), SpecExecutor.Verb.ADD);
+ Assert.assertEquals(jobSpecs.get(1).getLeft(), SpecExecutor.Verb.ADD);
+ List<String> uriList = jobSpecs.stream().map(s ->
s.getRight().getUri().toString()).collect(Collectors.toList());
+ Assert.assertTrue(uriList.contains( "testJob"));
+ Assert.assertTrue(uriList.contains( "newTestJob"));
}
@Test (dependsOnMethods = "testAddSpec")
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 3d2587f..0ac1544 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -93,7 +93,7 @@ public class AvroUtils {
public static final String FIELD_LOCATION_DELIMITER = ".";
- private static final String AVRO_SUFFIX = ".avro";
+ public static final String AVRO_SUFFIX = ".avro";
private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";