This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


View the commit online:
https://github.com/apache/incubator-gobblin/commit/afdc95c6c26d67e4362d81010a52c13af5ac204d

The following commit(s) were added to refs/heads/master by this push:
     new afdc95c  [GOBBLIN-971] Enable speculative execution awareness for 
RowQualityChecker
afdc95c is described below

commit afdc95c6c26d67e4362d81010a52c13af5ac204d
Author: autumnust <[email protected]>
AuthorDate: Wed Nov 20 14:25:54 2019 -0800

    [GOBBLIN-971] Enable speculative execution awareness for RowQualityChecker
    
    Closes #2819 from autumnust/PolicyCheckerfileHanld
    erContentionWhenSpeculativeExecutionHappens
---
 .../qualitychecker/row/RowLevelPolicyChecker.java  | 33 ++++++++++++++++--
 .../{ => row}/RowLevelQualityCheckerTest.java      | 40 +++++++++++++++++++---
 2 files changed, 67 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
index 5312828..9fa6111 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -45,7 +46,17 @@ import javax.annotation.concurrent.ThreadSafe;
 import lombok.Getter;
 
 
-public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, 
RecordStreamProcessor<S, S, D, D> {
+public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, 
RecordStreamProcessor<S, S, D, D>,
+                                                    
SpeculativeAttemptAwareConstruct {
+
+  /**
+   * Given the existence of writer object when the policy is set to {@link 
RowLevelPolicy.Type#ERR_FILE}, objects of
+   * this class needs to be speculative-attempt-aware.
+   */
+  @Override
+  public boolean isSpeculativeAttemptSafe() {
+    return this.list.stream().noneMatch(x -> 
x.getType().equals(RowLevelPolicy.Type.ERR_FILE)) || 
this.allowSpeculativeExecWhenWriteErrFile;
+  }
 
   private final List<RowLevelPolicy> list;
   private final String stateId;
@@ -56,6 +67,17 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
   private RowLevelErrFileWriter writer;
   @Getter
   private final RowLevelPolicyCheckResults results;
+  /** Flag to determine if it is safe to enable speculative execution when 
policy is set to ERR_FILE
+   * Users are suggested to turn this off since it could potentially run into 
HDFS file lease contention if multiple
+   * speculative execution are appending to the same ERR_FILE.
+   *
+   * When there are ERR_FILE policy appears and users are enforcing to set it 
to true, RowPolicyChecker will created
+   * different ERR_FILE with timestamp in name to avoid contention but there's 
no guarantee as
+   * different containers' clock is hard to coordinate.
+   * */
+  private boolean allowSpeculativeExecWhenWriteErrFile;
+
+  static final String ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY = 
"allowSpeculativeExecutionWithErrFilePolicy";
 
   public RowLevelPolicyChecker(List<RowLevelPolicy> list, String stateId, 
FileSystem fs) {
     this(list, stateId, fs, new State());
@@ -71,6 +93,9 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
     this.results = new RowLevelPolicyCheckResults();
     this.sampler = new 
FrontLoadedSampler(state.getPropAsLong(ConfigurationKeys.ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK,
         ConfigurationKeys.DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK), 1.5);
+
+    /** By default set to true as to maintain backward-compatibility */
+    this.allowSpeculativeExecWhenWriteErrFile = 
state.getPropAsBoolean(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, true);
   }
 
   public boolean executePolicies(Object record, RowLevelPolicyCheckResults 
results) throws IOException {
@@ -98,11 +123,15 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
     return true;
   }
 
-  private Path getErrFilePath(RowLevelPolicy policy) {
+  Path getErrFilePath(RowLevelPolicy policy) {
     String errFileName = HadoopUtils.sanitizePath(policy.toString(), "-");
     if (!Strings.isNullOrEmpty(this.stateId)) {
       errFileName += "-" + this.stateId;
     }
+    if (allowSpeculativeExecWhenWriteErrFile) {
+      errFileName += "-" + System.currentTimeMillis();
+    }
+
     errFileName += ".err";
     return new Path(policy.getErrFileLocation(), errFileName);
   }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
similarity index 63%
rename from 
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java
rename to 
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
index 808cfef..48eb818 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.qualitychecker;
+package org.apache.gobblin.qualitychecker.row;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
-import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
-import 
org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilderFactory;
+import org.apache.gobblin.qualitychecker.TestConstants;
+import org.apache.gobblin.qualitychecker.TestRowLevelPolicy;
 import java.io.File;
 import java.net.URI;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.FileReader;
@@ -36,6 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static 
org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY;
+
 
 @Test(groups = {"gobblin.qualitychecker"})
 public class RowLevelQualityCheckerTest {
@@ -57,6 +60,35 @@ public class RowLevelQualityCheckerTest {
     }
   }
 
+  public void testFileNameWithTimestamp() throws Exception {
+    State state = new State();
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestRowLevelPolicy");
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE");
+    state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, 
TestConstants.TEST_ERR_FILE);
+    RowLevelPolicyChecker checker =
+        new 
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
+    Path path = checker.getErrFilePath(new TestRowLevelPolicy(state, 
RowLevelPolicy.Type.ERR_FILE));
+
+    // Verify that path follows the structure which contains timestamp.
+    Pattern pattern = 
Pattern.compile("test\\/org.apache.gobblin.qualitychecker.TestRowLevelPolicy-\\d+\\.err");
+    Matcher matcher = pattern.matcher(path.toString());
+    Assert.assertTrue(matcher.matches());
+
+    // Positive case with multiple non-err_file policy specified.
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestRowLevelPolicy,org.apache.gobblin.qualitychecker.TestRowLevelPolicy");
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, 
"FAIL,OPTIONAL");
+    state.setProp(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, false);
+    checker =
+        new 
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
+    Assert.assertTrue(checker.isSpeculativeAttemptSafe());
+
+    // Negative case with multiple policy containing err_file
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, 
"FAIL,ERR_FILE");
+    checker =
+        new 
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
+    Assert.assertFalse(checker.isSpeculativeAttemptSafe());
+  }
+
   @Test(groups = {"ignore"})
   public void testWriteToErrFile()
       throws Exception {

Reply via email to