This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 4ab42c3 [GOBBLIN-1119] Enable close-on-flush for quality-checker's err-file 4ab42c3 is described below commit 4ab42c30e20398910b9496eaece9006183ef931f Author: Lei Sun <le...@linkedin.com> AuthorDate: Thu Apr 16 13:59:57 2020 -0700 [GOBBLIN-1119] Enable close-on-flush for quality-checker's err-file Enable close-on-flush for quality-checker's err- file Address comments Closes #2959 from autumnust/qualityCheckererrFileFlush --- .../qualitychecker/row/RowLevelPolicyChecker.java | 28 +++++++--- .../qualitychecker/TestRowLevelPolicyFail.java | 2 +- .../row/RowLevelQualityCheckerTest.java | 59 +++++++++++++++++++++- 3 files changed, 80 insertions(+), 9 deletions(-) diff --git a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java index 08d81fe..e7b94e4 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java @@ -23,11 +23,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct; +import org.apache.gobblin.stream.FlushControlMessage; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.google.common.base.Strings; -import com.google.common.io.Closer; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; @@ -44,8 +44,10 @@ import org.apache.gobblin.util.HadoopUtils; import io.reactivex.Flowable; import javax.annotation.concurrent.ThreadSafe; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, RecordStreamProcessor<S, S, D, D>, SpeculativeAttemptAwareConstruct { @@ -63,7 +65,6 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor private final String stateId; private final FileSystem fs; private boolean errFileOpen; - private final Closer closer; private final FrontLoadedSampler sampler; private RowLevelErrFileWriter writer; @Getter @@ -89,8 +90,6 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor this.stateId = stateId; this.fs = fs; this.errFileOpen = false; - this.closer = Closer.create(); - this.writer = this.closer.register(new RowLevelErrFileWriter(this.fs)); this.results = new RowLevelPolicyCheckResults(); this.sampler = new FrontLoadedSampler(state.getPropAsLong(ConfigurationKeys.ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK, ConfigurationKeys.DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK), 1.5); @@ -121,6 +120,7 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor } else if (p.getType().equals(RowLevelPolicy.Type.ERR_FILE)) { if (this.sampler.acceptNext()) { if (!this.errFileOpen) { + this.writer = new RowLevelErrFileWriter(this.fs); this.writer.open(getErrFilePath(p)); this.writer.write(record); } else { @@ -150,7 +150,8 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor @Override public void close() throws IOException { if (this.errFileOpen) { - this.closer.close(); + this.writer.close(); + this.errFileOpen = false; } } @@ -197,7 +198,22 @@ public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, Recor * @return a {@link ControlMessageHandler}. */ protected ControlMessageHandler getMessageHandler() { - return ControlMessageHandler.NOOP; + /** + * When seeing {@link FlushControlMessage and using ERR_FILE as the quality-checker handling, + * close the open error file and create new one. + */ + return new ControlMessageHandler() { + @Override + public void handleMessage(ControlMessage message) { + if (message instanceof FlushControlMessage ) { + try { + RowLevelPolicyChecker.this.close(); + } catch (IOException ioe) { + log.error("Failed to close errFile", ioe); + } + } + } + }; } /** diff --git a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java index fb2458a..ea27df4 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java @@ -28,6 +28,6 @@ public class TestRowLevelPolicyFail extends RowLevelPolicy { @Override public Result executePolicy(Object record) { - return RowLevelPolicy.Result.PASSED; + return RowLevelPolicy.Result.FAILED; } } diff --git a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java index 2725e06..09dc729 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java @@ -17,6 +17,9 @@ package org.apache.gobblin.qualitychecker.row; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.qualitychecker.TestConstants; @@ -25,6 +28,8 @@ import java.io.File; import java.io.Flushable; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,11 +43,15 @@ import org.apache.gobblin.records.ControlMessageHandler; import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.stream.FlushControlMessage; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.io.Files; + +import static org.apache.gobblin.configuration.ConfigurationKeys.ROW_LEVEL_ERR_FILE; import static org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY; import static org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilder.ROW_LEVEL_POLICY_CHECKER_TYPE; @@ -67,6 +76,52 @@ public class RowLevelQualityCheckerTest { } } + /** + * Verify close-on-flush for errFile in quality checker. + */ + public void testErrFileCloseOnFlush() throws Exception { + File dir = Files.createTempDir(); + dir.deleteOnExit(); + + State state = new State(); + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, "org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail"); + state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, RowLevelPolicy.Type.ERR_FILE); + state.setProp(ROW_LEVEL_ERR_FILE, dir.getAbsolutePath()); + + RowLevelPolicyChecker checker = + new RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, -1).build(); + RowLevelPolicyCheckResults results = new RowLevelPolicyCheckResults(); + + Schema intSchema = SchemaBuilder.record("test") + .fields() + .name("a").type().intType().noDefault() + .endRecord(); + GenericRecord intRecord = new GenericRecordBuilder(intSchema) + .set("a", 1) + .build(); + + GenericRecord intRecord_2 = new GenericRecordBuilder(intSchema) + .set("a", 2) + .build(); + + Assert.assertFalse(checker.executePolicies(intRecord, results)); + // Inspect the folder: Should see files with zero byte + FileSystem fs = FileSystem.getLocal(new Configuration()); + List<FileStatus> fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath()))); + Assert.assertEquals(fileList.size(), 1); + Assert.assertEquals(fileList.get(0).getLen(),0 ); + + // This should trigger errFile flush so that file size should be larger than zero. + checker.getMessageHandler().handleMessage(FlushControlMessage.builder().build()); + fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath()))); + Assert.assertTrue(fileList.get(0).getLen() > 0); + + // This should trigger a new errFile created. + Assert.assertFalse(checker.executePolicies(intRecord_2, results)); + fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath()))); + Assert.assertEquals(fileList.size(), 2); + } + // Verify rowPolicyChecker is configurable. public void testRowPolicyCheckerBuilder() throws Exception { State state = new State(); @@ -81,7 +136,7 @@ public class RowLevelQualityCheckerTest { State state = new State(); state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, "org.apache.gobblin.qualitychecker.TestRowLevelPolicy"); state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE"); - state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE); + state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE); RowLevelPolicyChecker checker = new RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, -1).build(); Path path = checker.getErrFilePath(new TestRowLevelPolicy(state, RowLevelPolicy.Type.ERR_FILE)); @@ -112,7 +167,7 @@ public class RowLevelQualityCheckerTest { State state = new State(); state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, "org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail"); state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE"); - state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE); + state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE); state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, TestConstants.TEST_FS_URI); RowLevelPolicyChecker checker =