This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ab42c3  [GOBBLIN-1119] Enable close-on-flush for quality-checker's 
err-file
4ab42c3 is described below

commit 4ab42c30e20398910b9496eaece9006183ef931f
Author: Lei Sun <le...@linkedin.com>
AuthorDate: Thu Apr 16 13:59:57 2020 -0700

    [GOBBLIN-1119] Enable close-on-flush for quality-checker's err-file
    
    Enable close-on-flush for quality-checker's err-
    file
    
    Address comments
    
    Closes #2959 from
    autumnust/qualityCheckererrFileFlush
---
 .../qualitychecker/row/RowLevelPolicyChecker.java  | 28 +++++++---
 .../qualitychecker/TestRowLevelPolicyFail.java     |  2 +-
 .../row/RowLevelQualityCheckerTest.java            | 59 +++++++++++++++++++++-
 3 files changed, 80 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
index 08d81fe..e7b94e4 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/row/RowLevelPolicyChecker.java
@@ -23,11 +23,11 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
+import org.apache.gobblin.stream.FlushControlMessage;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Strings;
-import com.google.common.io.Closer;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -44,8 +44,10 @@ import org.apache.gobblin.util.HadoopUtils;
 import io.reactivex.Flowable;
 import javax.annotation.concurrent.ThreadSafe;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 
+@Slf4j
 public class RowLevelPolicyChecker<S, D> implements Closeable, FinalState, 
RecordStreamProcessor<S, S, D, D>,
                                                     
SpeculativeAttemptAwareConstruct {
 
@@ -63,7 +65,6 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
   private final String stateId;
   private final FileSystem fs;
   private boolean errFileOpen;
-  private final Closer closer;
   private final FrontLoadedSampler sampler;
   private RowLevelErrFileWriter writer;
   @Getter
@@ -89,8 +90,6 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
     this.stateId = stateId;
     this.fs = fs;
     this.errFileOpen = false;
-    this.closer = Closer.create();
-    this.writer = this.closer.register(new RowLevelErrFileWriter(this.fs));
     this.results = new RowLevelPolicyCheckResults();
     this.sampler = new 
FrontLoadedSampler(state.getPropAsLong(ConfigurationKeys.ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK,
         ConfigurationKeys.DEFAULT_ROW_LEVEL_ERR_FILE_RECORDS_PER_TASK), 1.5);
@@ -121,6 +120,7 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
       } else if (p.getType().equals(RowLevelPolicy.Type.ERR_FILE)) {
         if (this.sampler.acceptNext()) {
           if (!this.errFileOpen) {
+            this.writer = new RowLevelErrFileWriter(this.fs);
             this.writer.open(getErrFilePath(p));
             this.writer.write(record);
           } else {
@@ -150,7 +150,8 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
   @Override
   public void close() throws IOException {
     if (this.errFileOpen) {
-      this.closer.close();
+      this.writer.close();
+      this.errFileOpen = false;
     }
   }
 
@@ -197,7 +198,22 @@ public class RowLevelPolicyChecker<S, D> implements 
Closeable, FinalState, Recor
    * @return a {@link ControlMessageHandler}.
    */
   protected ControlMessageHandler getMessageHandler() {
-    return ControlMessageHandler.NOOP;
+    /**
+     * When seeing {@link FlushControlMessage and using ERR_FILE as the 
quality-checker handling,
+     * close the open error file and create new one.
+     */
+    return new ControlMessageHandler() {
+      @Override
+      public void handleMessage(ControlMessage message) {
+        if (message instanceof FlushControlMessage ) {
+          try {
+            RowLevelPolicyChecker.this.close();
+          } catch (IOException ioe) {
+            log.error("Failed to close errFile", ioe);
+          }
+        }
+      }
+    };
   }
 
   /**
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
index fb2458a..ea27df4 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/TestRowLevelPolicyFail.java
@@ -28,6 +28,6 @@ public class TestRowLevelPolicyFail extends RowLevelPolicy {
 
   @Override
   public Result executePolicy(Object record) {
-    return RowLevelPolicy.Result.PASSED;
+    return RowLevelPolicy.Result.FAILED;
   }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
index 2725e06..09dc729 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/row/RowLevelQualityCheckerTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.qualitychecker.row;
 
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.qualitychecker.TestConstants;
@@ -25,6 +28,8 @@ import java.io.File;
 import java.io.Flushable;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -38,11 +43,15 @@ import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.stream.FlushControlMessage;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.io.Files;
+
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.ROW_LEVEL_ERR_FILE;
 import static 
org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker.ALLOW_SPECULATIVE_EXECUTION_WITH_ERR_FILE_POLICY;
 import static 
org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckerBuilder.ROW_LEVEL_POLICY_CHECKER_TYPE;
 
@@ -67,6 +76,52 @@ public class RowLevelQualityCheckerTest {
     }
   }
 
+  /**
+   * Verify close-on-flush for errFile in quality checker.
+   */
+  public void testErrFileCloseOnFlush() throws Exception {
+    File dir = Files.createTempDir();
+    dir.deleteOnExit();
+
+    State state = new State();
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail");
+    state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, 
RowLevelPolicy.Type.ERR_FILE);
+    state.setProp(ROW_LEVEL_ERR_FILE, dir.getAbsolutePath());
+
+    RowLevelPolicyChecker checker =
+        new 
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
+    RowLevelPolicyCheckResults results = new RowLevelPolicyCheckResults();
+
+    Schema intSchema = SchemaBuilder.record("test")
+        .fields()
+        .name("a").type().intType().noDefault()
+        .endRecord();
+    GenericRecord intRecord = new GenericRecordBuilder(intSchema)
+        .set("a", 1)
+        .build();
+
+    GenericRecord intRecord_2 = new GenericRecordBuilder(intSchema)
+        .set("a", 2)
+        .build();
+
+    Assert.assertFalse(checker.executePolicies(intRecord, results));
+    // Inspect the folder: Should see files with zero byte
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    List<FileStatus> fileList = Arrays.asList(fs.listStatus(new 
Path(dir.getPath())));
+    Assert.assertEquals(fileList.size(), 1);
+    Assert.assertEquals(fileList.get(0).getLen(),0 );
+
+    // This should trigger errFile flush so that file size should be larger 
than zero.
+    
checker.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+    fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath())));
+    Assert.assertTrue(fileList.get(0).getLen() > 0);
+
+    // This should trigger a new errFile created.
+    Assert.assertFalse(checker.executePolicies(intRecord_2, results));
+    fileList = Arrays.asList(fs.listStatus(new Path(dir.getPath())));
+    Assert.assertEquals(fileList.size(), 2);
+  }
+
   // Verify rowPolicyChecker is configurable.
   public void testRowPolicyCheckerBuilder() throws Exception {
     State state = new State();
@@ -81,7 +136,7 @@ public class RowLevelQualityCheckerTest {
     State state = new State();
     state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestRowLevelPolicy");
     state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE");
-    state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, 
TestConstants.TEST_ERR_FILE);
+    state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE);
     RowLevelPolicyChecker checker =
         new 
RowLevelPolicyCheckerBuilderFactory().newPolicyCheckerBuilder(state, 
-1).build();
     Path path = checker.getErrFilePath(new TestRowLevelPolicy(state, 
RowLevelPolicy.Type.ERR_FILE));
@@ -112,7 +167,7 @@ public class RowLevelQualityCheckerTest {
     State state = new State();
     state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST, 
"org.apache.gobblin.qualitychecker.TestRowLevelPolicyFail");
     state.setProp(ConfigurationKeys.ROW_LEVEL_POLICY_LIST_TYPE, "ERR_FILE");
-    state.setProp(ConfigurationKeys.ROW_LEVEL_ERR_FILE, 
TestConstants.TEST_ERR_FILE);
+    state.setProp(ROW_LEVEL_ERR_FILE, TestConstants.TEST_ERR_FILE);
     state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
TestConstants.TEST_FS_URI);
 
     RowLevelPolicyChecker checker =

Reply via email to