This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4ab42c3 [GOBBLIN-1119] Enable close-on-flush for quality-checker's
err-file
4ab42c3 is described below
commit 4ab42c30e20398910b9496eaece9006183ef931f
Author: Lei Sun <[email protected]>
AuthorDate: Thu Apr 16 13:59:57 2020 -0700
[GOBBLIN-1119] Enable close-on-flush for quality-checker's err-file
Enable close-on-flush for quality-checker's err-
file
Address comments
Closes #2959 from
autumnust/qualityCheckererrFileFlush
---
.../qualitychecker/row/RowLevelPolicyChecker.java | 28 +++++++---
.../qualitychecker/TestRowLevelPolicyFail.java | 2 +-
.../row/RowLevelQualityCheckerTest.java | 59 +++++++++++++++++++++-
3 files changed, 80 insertions(+), 9 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
index 08d81fe..e7b94e4 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
@@ -23,11 +23,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
+import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Strings;
-import com.google.common.io.Closer;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -44,8 +44,10 @@ import org.apache.gobblin.util.HadoopUtils;
import io.reactivex.Flowable;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState,
RecordStreamProcessor<S, S, D, D>,
SpeculativeAttemptAwareConstruct {
@@ -63,7 +65,6 @@ public class RowLevelPolicyChecker<S, D> implements
Closeable, FinalState, Recor
private final String stateId;
private final FileSystem fs;
private boolean errFileOpen;
- private final Closer closer;
private final FrontLoadedSampler sampler;
private RowLevelErrFileWriter writer;
@Getter
@@ -89,8 +90,6 @@ public class RowLevelPolicyChecker<S, D> implements
Closeable, FinalState, Recor
this.stateId = stateId;
this.fs = fs;
this.errFileOpen = false;
- this.closer = Closer.create();
- this.writer = this.closer.register(new RowLevelErrFileWriter(this.fs));
this.results = new RowLevelPolicyCheckResults();
this.sampler = new
FrontLoadedSampler(state.getPropAsLong(ConfigurationKeys.ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK,
ConfigurationKeys.DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK), 1.5);
@@ -121,6 +120,7 @@ public class RowLevelPolicyChecker<S, D> implements
Closeable, FinalState, Recor
} else if (p.getType().equals(RowLevelPolicy.Type.ERR_FILE)) {
if (this.sampler.acceptNext()) {
if (!this.errFileOpen) {
+ this.writer = new RowLevelErrFileWriter(this.fs);
this.writer.open(getErrFilePath(p));
this.writer.write(record);
} else {
@@ -150,7 +150,8 @@ public class RowLevelPolicyChecker<S, D> implements
Closeable, FinalState, Recor
@Override
public void close() throws IOException {
if (this.errFileOpen) {
- this.closer.close();
+ this.writer.close();
+ this.errFileOpen = false;
}
}
@@ -197,7 +198,22 @@ public class RowLevelPolicyChecker<S, D> implements
Closeable, FinalState, Recor
* @return a {@link ControlMessageHandler}.
*/
protected ControlMessageHandler getMessageHandler() {
- return ControlMessageHandler.NOOP;
+ /**
+ * When seeing {@link FlushControlMessage and using ERR_FILE as the
quality-checker handling,
+ * close the open error file and create new one.
+ */
+ return new ControlMessageHandler() {
+ @Override
+ public void handleMessage(ControlMessage message) {
+ if (message instanceof FlushControlMessage ) {
+ try {
+ RowLevelPolicyChecker.this.close();
+ } catch (IOException ioe) {
+ log.error("Failed to close errFile", ioe);
+ }
+ }
+ }
+ };
}
/**
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
index fb2458a..ea27df4 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
@@ -28,6 +28,6 @@ public class TestRowLevelPolicyFail extends RowLevelPolicy {
@Override
public Result executePolicy(Object record) {
- return RowLevelPolicy.Result.PASSED;
+ return RowLevelPolicy.Result.FAILED;
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
index 2725e06..09dc729 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.qualitychecker.row;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.qualitychecker.TestConstants;
@@ -25,6 +28,8 @@ import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -38,11 +43,15 @@ import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.google.common.io.Files;
+
+import static
org.apache.gobblin.configuration.ConfigurationKeys.ROW_LEVEL_ERR_FILE;
import static
org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY;
import static
org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilder.ROW_LEVEL_POLICY_CHECKER_TYPE;
@@ -67,6 +76,52 @@ public class RowLevelQualityCheckerTest {
}
}
+ /**
+ * Verify close-on-flush for errFile in quality checker.
+ */
+ public void testErrFileCloseOnFlush() throws Exception {
+ File dir = Files.createTempDir();
+ dir.deleteOnExit();
+
+ State state = new State();
+ state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST,
"org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail");
+ state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE,
RowLevelPolicy.Type.ERR_FILE);
+ state.setProp(ROW_LEVEL_ERR_FILE, dir.getAbsolutePath());
+
+ RowLevelPolicyChecker checker =
+ new
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state,
-1).build();
+ RowLevelPolicyCheckResults results = new RowLevelPolicyCheckResults();
+
+ Schema intSchema = SchemaBuilder.record("test")
+ .fields()
+ .name("a").type().intType().noDefault()
+ .endRecord();
+ GenericRecord intRecord = new GenericRecordBuilder(intSchema)
+ .set("a", 1)
+ .build();
+
+ GenericRecord intRecord_2 = new GenericRecordBuilder(intSchema)
+ .set("a", 2)
+ .build();
+
+ Assert.assertFalse(checker.executePolicies(intRecord, results));
+ // Inspect the folder: Should see files with zero byte
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ List<FileStatus> fileList = Arrays.asList(fs.listStatus(new
Path(dir.getPath())));
+ Assert.assertEquals(fileList.size(), 1);
+ Assert.assertEquals(fileList.get(0).getLen(),0 );
+
+ // This should trigger errFile flush so that file size should be larger
than zero.
+
checker.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+ fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath())));
+ Assert.assertTrue(fileList.get(0).getLen() > 0);
+
+ // This should trigger a new errFile created.
+ Assert.assertFalse(checker.executePolicies(intRecord_2, results));
+ fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath())));
+ Assert.assertEquals(fileList.size(), 2);
+ }
+
// Verify rowPolicyChecker is configurable.
public void testRowPolicyCheckerBuilder() throws Exception {
State state = new State();
@@ -81,7 +136,7 @@ public class RowLevelQualityCheckerTest {
State state = new State();
state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST,
"org.apache.gobblin.qualitychecker.TestRowLevelPolicy");
state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE");
- state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE,
TestConstants.TEST_ERR_FILE);
+ state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE);
RowLevelPolicyChecker checker =
new
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state,
-1).build();
Path path = checker.getErrFilePath(new TestRowLevelPolicy(state,
RowLevelPolicy.Type.ERR_FILE));
@@ -112,7 +167,7 @@ public class RowLevelQualityCheckerTest {
State state = new State();
state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST,
"org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail");
state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE");
- state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE,
TestConstants.TEST_ERR_FILE);
+ state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE);
state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
TestConstants.TEST_FS_URI);
RowLevelPolicyChecker checker =