This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 631deca  [GOBBLIN-848] Make initialization of CompactionSource 
extensible with certain protection
631deca is described below

commit 631deca45936f134f77d750e614c5c11ee1d4852
Author: autumnust <[email protected]>
AuthorDate: Fri Aug 9 15:06:37 2019 -0700

    [GOBBLIN-848] Make initialization of CompactionSource extensible with 
certain protection
    
    Closes #2703 from autumnust/mixFormatCompaction
---
 .../compaction/source/CompactionSource.java        | 73 +++++++++++++---------
 travis/test-build.sh                               |  2 +-
 travis/test-group1.sh                              |  2 +-
 3 files changed, 44 insertions(+), 33 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index dbe5256..149bdd0 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.compaction.source;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -32,23 +31,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -88,6 +70,24 @@ import 
org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
 import org.apache.gobblin.util.request_allocation.ResourceEstimator;
 import org.apache.gobblin.util.request_allocation.ResourcePool;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.gobblin.util.HadoopUtils.getSourceFileSystem;
+
 
 /**
  * A compaction source derived from {@link Source} which uses {@link 
DefaultFileSystemGlobFinder} to find all
@@ -110,13 +110,8 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
   @Override
   public WorkUnitStream getWorkunitStream(SourceState state) {
     try {
-      fs = getSourceFileSystem(state);
-      state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
-      suite = 
CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
+      initCompactionSource(state);
 
-      initRequestAllocator(state);
-      initJobDir(state);
-      copyJarDependencies(state);
       DatasetsFinder finder = 
DatasetUtils.instantiateDatasetFinder(state.getProperties(),
               getSourceFileSystem(state),
               DefaultFileSystemGlobFinder.class.getName());
@@ -219,6 +214,29 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     }
   }
 
+  /**
+   * An non-extensible init method for {@link CompactionSource}, while it 
leaves
+   * extensible {@link #optionalInit(SourceState)} to derived class to adding 
customized initialization.
+   *
+   * Comparing to make this method protected directly, this approach is less 
error-prone since all initialization
+   * happening inside {@link #initCompactionSource(SourceState)} is compulsory.
+   */
+  private void initCompactionSource(SourceState state) throws IOException {
+    fs = getSourceFileSystem(state);
+    state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
+    suite = 
CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
+
+    initRequestAllocator(state);
+    initJobDir(state);
+    copyJarDependencies(state);
+
+    optionalInit(state);
+  }
+
+  protected void optionalInit(SourceState state) {
+    // do nothing.
+  }
+
   private void initRequestAllocator (State state) {
     try {
       ResourceEstimator estimator = 
GobblinConstructorUtils.<ResourceEstimator>invokeLongestConstructor(
@@ -437,13 +455,6 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     }
   }
 
-  public static FileSystem getSourceFileSystem(State state)
-          throws IOException {
-    Configuration conf = HadoopUtils.getConfFromState(state);
-    String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI);
-    return 
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), 
conf), state);
-  }
-
   /**
    * Create a temporary job directory based on job id or (if not available) 
UUID
    */
diff --git a/travis/test-build.sh b/travis/test-build.sh
index 4604d26..caf1495 100755
--- a/travis/test-build.sh
+++ b/travis/test-build.sh
@@ -24,4 +24,4 @@
 set -e
 
 echo "Starting $0 at " $(date)
-time ./gradlew clean build -x test -Dorg.gradle.parallel=false 
$GOBBLIN_GRADLE_OPTS
+time ./gradlew clean build -x test -x javadoc -Dorg.gradle.parallel=true 
$GOBBLIN_GRADLE_OPTS
diff --git a/travis/test-group1.sh b/travis/test-group1.sh
index 0c02d5c..fc04830 100755
--- a/travis/test-group1.sh
+++ b/travis/test-group1.sh
@@ -32,4 +32,4 @@ echo "Precompiling tests:"
 rm -rf $HOME/.gradle/caches/
 ./gradlew compileTest -Porg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS
 echo "Running tests for $TEST_GROUP1"
-time ./gradlew -PskipTestGroup=disabledOnTravis -PrunTestGroups=$TEST_GROUP1 
-Dorg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS test
+time ./gradlew -PskipTestGroup=disabledOnTravis -PrunTestGroups=$TEST_GROUP1 
-Dorg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS test
\ No newline at end of file

Reply via email to