This is an automated email from the ASF dual-hosted git repository.
ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f029b4c [GOBBLIN-734] Fix speculative safety checking in HiveWritable
writer
f029b4c is described below
commit f029b4c0fea0fe4aa62f36dda2512344ff708bae
Author: autumnust <[email protected]>
AuthorDate: Fri Apr 26 10:22:39 2019 -0700
[GOBBLIN-734] Fix speculative safety checking in HiveWritable writer
Fix speculative safety checking in HiveWritable
writer
Fix HiveSerDe Test
Fix unit test: Missing file loading path
Closes #2601 from
autumnust/fixSpeculativeExecOnHiveWritableWriter
---
.../org/apache/gobblin/writer/FsDataWriter.java | 4 +++
.../gobblin/writer/HiveWritableHdfsDataWriter.java | 5 ++++
.../org/apache/gobblin/serde/HiveSerDeTest.java | 35 ++++++++++++----------
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index dadb51f..23a885a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -345,6 +345,10 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
return this.fs.makeQualified(this.outputFile).toString();
}
+ /**
+ * Classes that extends this method needs to determine if
writerAttemptIdOptional is present and to avoid
+ * problems of overriding, adding another checking on class type.
+ */
@Override
public boolean isSpeculativeAttemptSafe() {
return this.writerAttemptIdOptional.isPresent() && this.getClass() ==
FsDataWriter.class;
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
index 1145920..47a596e 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriter.java
@@ -114,4 +114,9 @@ public class HiveWritableHdfsDataWriter extends
FsDataWriter<Writable> {
super.commit();
}
+
+ @Override
+ public boolean isSpeculativeAttemptSafe() {
+ return this.writerAttemptIdOptional.isPresent() && this.getClass() ==
HiveWritableHdfsDataWriter.class;
+ }
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
index ec7086e..cdc2047 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/serde/HiveSerDeTest.java
@@ -17,23 +17,14 @@
package org.apache.gobblin.serde;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
-
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
@@ -50,6 +41,13 @@ import org.apache.gobblin.writer.Destination.DestinationType;
import org.apache.gobblin.writer.HiveWritableHdfsDataWriter;
import org.apache.gobblin.writer.HiveWritableHdfsDataWriterBuilder;
import org.apache.gobblin.writer.WriterOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
/**
@@ -72,10 +70,12 @@ public class HiveSerDeTest {
*/
@Test(groups = { "gobblin.serde" })
public void testAvroOrcSerDes()
- throws IOException, DataRecordException, DataConversionException {
+ throws IOException, DataRecordException, DataConversionException,
URISyntaxException {
Properties properties = new Properties();
- properties.load(new
FileReader("gobblin-core/src/test/resources/serde/serde.properties"));
+
properties.load(HiveSerDeTest.class.getClassLoader().getResourceAsStream("serde/serde.properties"));
SourceState sourceState = new SourceState(new State(properties),
ImmutableList.<WorkUnitState> of());
+ File schemaFile = new
File(HiveSerDeTest.class.getClassLoader().getResource("serde/serde.avsc").toURI());
+ sourceState.setProp("avro.schema.url" , schemaFile.getAbsolutePath());
OldApiWritableFileSource source = new OldApiWritableFileSource();
List<WorkUnit> workUnits = source.getWorkunits(sourceState);
@@ -94,8 +94,11 @@ public class HiveSerDeTest {
writer =
closer.register((HiveWritableHdfsDataWriter) new
HiveWritableHdfsDataWriterBuilder<>().withBranches(1)
.withWriterId("0").writeTo(Destination.of(DestinationType.HDFS,
sourceState))
+ .withAttemptId("0-0")
.writeInFormat(WriterOutputFormat.ORC).build());
+ Assert.assertTrue(writer.isSpeculativeAttemptSafe());
+
converter.init(wus);
Writable record;