This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
View the commit online: https://github.com/apache/incubator-gobblin/commit/afdc95c6c26d67e4362d81010a52c13af5ac204d The following commit(s) were added to refs/heads/master by this push: new afdc95c [GOBBLIN-971] Enable speculative execution awareness for RowQualityChecker afdc95c is described below commit afdc95c6c26d67e4362d81010a52c13af5ac204d Author: autumnust <[email protected]> AuthorDate: Wed Nov 20 14:25:54 2019 -0800 [GOBBLIN-971] Enable speculative execution awareness for RowQualityChecker Closes #2819 from autumnust/PolicyCheckerfileHanld erContentionWhenSpeculativeExecutionHappens --- .../qualitychecker/row/RowLevelPolicyChecker.java | 33 ++++++++++++++++-- .../{ => row}/RowLevelQualityCheckerTest.java | 40 +++++++++++++++++++--- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java index 5312828..9fa6111 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,7 +46,17 @@ import javax.annotation.concurrent.ThreadSafe; import lombok.Getter; -public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, RecordStreamProcessor<S, S, D, D> { +public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, RecordStreamProcessor<S, S, D, D>, + SpeculativeAttemptAwareConstruct { + + /** + * Given the existence of writer object when the policy is set to {@link RowLevelPolicy.Type#ERR_FILE}, objects of + * this class needs to be speculative-attempt-aware. + */ + @Override + public boolean isSpeculativeAttemptSafe() { + return this.list.stream().noneMatch(x -> x.getType().equals(RowLevelPolicy.Type.ERR_FILE)) || this.allowSpeculativeExecWhenWriteErrFile; + } private final List<RowLevelPolicy> list; private final String stateId; @@ -56,6 +67,17 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor private RowLevelErrFileWriter writer; @Getter private final RowLevelPolicyCheckResults results; + /** Flag to determine if it is safe to enable speculative execution when policy is set to ERR_FILE + * Users are suggested to turn this off since it could potentially run into HDFS file lease contention if multiple + * speculative execution are appending to the same ERR_FILE. + * + * When there are ERR_FILE policy appears and users are enforcing to set it to true, RowPolicyChecker will created + * different ERR_FILE with timestamp in name to avoid contention but there's no guarantee as + * different containers' clock is hard to coordinate. + * */ + private boolean allowSpeculativeExecWhenWriteErrFile; + + static final String ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY = "allowSpeculativeExecutionWithErrFilePolicy"; public RowLevelPolicyChecker(List<RowLevelPolicy> list, String stateId, FileSystem fs) { this(list, stateId, fs, new State()); @@ -71,6 +93,9 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor this.results = new RowLevelPolicyCheckResults(); this.sampler = new FrontLoadedSampler(state.getPropAsLong(ConfigurationKeys.ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK, ConfigurationKeys.DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK), 1.5); + + /** By default set to true as to maintain backward-compatibility */ + this.allowSpeculativeExecWhenWriteErrFile = state.getPropAsBoolean(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, true); } public boolean executePolicies(Object record, RowLevelPolicyCheckResults results) throws IOException { @@ -98,11 +123,15 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor return true; } - private Path getErrFilePath(RowLevelPolicy policy) { + Path getErrFilePath(RowLevelPolicy policy) { String errFileName = HadoopUtils.sanitizePath(policy.toString(), "-"); if (!Strings.isNullOrEmpty(this.stateId)) { errFileName += "-" + this.stateId; } + if (allowSpeculativeExecWhenWriteErrFile) { + errFileName += "-" + System.currentTimeMillis(); + } + errFileName += ".err"; return new Path(policy.getErrFileLocation(), errFileName); } diff --git a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java similarity index 63% rename from gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java rename to gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java index 808cfef..48eb818 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/RowLevelQualityCheckerTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.gobblin.qualitychecker; +package org.apache.gobblin.qualitychecker.row; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults; -import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker; -import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilderFactory; +import org.apache.gobblin.qualitychecker.TestConstants; +import org.apache.gobblin.qualitychecker.TestRowLevelPolicy; import java.io.File; import java.net.URI; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; @@ -36,6 +37,8 @@ import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import static org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY; + @Test(groups = {"gobblin.qualitychecker"}) public class RowLevelQualityCheckerTest { @@ -57,6 +60,35 @@ public class RowLevelQualityCheckerTest { } } + public void testFileNameWithTimestamp() throws Exception { + State state = new State(); + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, "org.apache.gobblin.qualitychecker.TestRowLevelPolicy"); + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE"); + state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE); + RowLevelPolicyChecker checker = + new RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, -1).build(); + Path path = checker.getErrFilePath(new TestRowLevelPolicy(state, RowLevelPolicy.Type.ERR_FILE)); + + // Verify that path follows the structure which contains timestamp. + Pattern pattern = Pattern.compile("test\\/org.apache.gobblin.qualitychecker.TestRowLevelPolicy-\\d+\\.err"); + Matcher matcher = pattern.matcher(path.toString()); + Assert.assertTrue(matcher.matches()); + + // Positive case with multiple non-err_file policy specified. + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, "org.apache.gobblin.qualitychecker.TestRowLevelPolicy,org.apache.gobblin.qualitychecker.TestRowLevelPolicy"); + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "FAIL,OPTIONAL"); + state.setProp(ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY, false); + checker = + new RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, -1).build(); + Assert.assertTrue(checker.isSpeculativeAttemptSafe()); + + // Negative case with multiple policy containing err_file + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "FAIL,ERR_FILE"); + checker = + new RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, -1).build(); + Assert.assertFalse(checker.isSpeculativeAttemptSafe()); + } + @Test(groups = {"ignore"}) public void testWriteToErrFile() throws Exception {
