This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f029b4c  [GOBBLIN-734] Fix speculative safety checking in HiveWritable 
writer
f029b4c is described below

commit f029b4c0fea0fe4aa62f36dda2512344ff708bae
Author: autumnust <[email protected]>
AuthorDate: Fri Apr 26 10:22:39 2019 -0700

    [GOBBLIN-734] Fix speculative safety checking in HiveWritable writer
    
    Fix speculative safety checking in HiveWritable
    writer
    
    Fix HiveSerDe Test
    
    Fix unit test: Missing file loading path
    
    Closes #2601 from
    autumnust/fixSpeculativeExecOnHiveWritableWriter
---
 .../org/apache/gobblin/writer/FsDataWriter.java    |  4 +++
 .../gobblin/writer/HiveWritableHdfsDataWriter.java |  5 ++++
 .../org/apache/gobblin/serde/HiveSerDeTest.java    | 35 ++++++++++++----------
 3 files changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index dadb51f..23a885a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -345,6 +345,10 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     return this.fs.makeQualified(this.outputFile).toString();
   }
 
+  /**
+   * Classes that extends this method needs to determine if 
writerAttemptIdOptional is present and to avoid
+   * problems of overriding, adding another checking on class type.
+   */
   @Override
   public boolean isSpeculativeAttemptSafe() {
     return this.writerAttemptIdOptional.isPresent() && this.getClass() == 
FsDataWriter.class;
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index 1145920..47a596e 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -114,4 +114,9 @@ public class HiveWritableHdfsDataWriter extends 
FsDataWriter<Writable> {
 
     super.commit();
   }
+
+  @Override
+  public boolean isSpeculativeAttemptSafe() {
+    return this.writerAttemptIdOptional.isPresent() && this.getClass() == 
HiveWritableHdfsDataWriter.class;
+  }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java 
b/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
index ec7086e..cdc2047 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
@@ -17,23 +17,14 @@
 
 package org.apache.gobblin.serde;
 
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.io.Closer;
-
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
@@ -50,6 +41,13 @@ import org.apache.gobblin.writer.Destination.DestinationType;
 import org.apache.gobblin.writer.HiveWritableHdfsDataWriter;
 import org.apache.gobblin.writer.HiveWritableHdfsDataWriterBuilder;
 import org.apache.gobblin.writer.WriterOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 
 /**
@@ -72,10 +70,12 @@ public class HiveSerDeTest {
    */
   @Test(groups = { "gobblin.serde" })
   public void testAvroOrcSerDes()
-      throws IOException, DataRecordException, DataConversionException {
+      throws IOException, DataRecordException, DataConversionException, 
URISyntaxException {
     Properties properties = new Properties();
-    properties.load(new 
FileReader("gobblin-core/src/test/resources/serde/serde.properties"));
+    
properties.load(HiveSerDeTest.class.getClassLoader().getResourceAsStream("serde/serde.properties"));
     SourceState sourceState = new SourceState(new State(properties), 
ImmutableList.<WorkUnitState> of());
+    File schemaFile = new 
File(HiveSerDeTest.class.getClassLoader().getResource("serde/serde.avsc").toURI());
+    sourceState.setProp("avro.schema.url" , schemaFile.getAbsolutePath());
 
     OldApiWritableFileSource source = new OldApiWritableFileSource();
     List<WorkUnit> workUnits = source.getWorkunits(sourceState);
@@ -94,8 +94,11 @@ public class HiveSerDeTest {
       writer =
           closer.register((HiveWritableHdfsDataWriter) new 
HiveWritableHdfsDataWriterBuilder<>().withBranches(1)
               .withWriterId("0").writeTo(Destination.of(DestinationType.HDFS, 
sourceState))
+              .withAttemptId("0-0")
               .writeInFormat(WriterOutputFormat.ORC).build());
 
+      Assert.assertTrue(writer.isSpeculativeAttemptSafe());
+
       converter.init(wus);
       Writable record;
 

Reply via email to