This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 631deca [GOBBLIN-848] Make initialization of CompactionSource
extensible with certain protection
631deca is described below
commit 631deca45936f134f77d750e614c5c11ee1d4852
Author: autumnust <[email protected]>
AuthorDate: Fri Aug 9 15:06:37 2019 -0700
[GOBBLIN-848] Make initialization of CompactionSource extensible with
certain protection
Closes #2703 from autumnust/mixFormatCompaction
---
.../compaction/source/CompactionSource.java | 73 +++++++++++++---------
travis/test-build.sh | 2 +-
travis/test-group1.sh | 2 +-
3 files changed, 44 insertions(+), 33 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index dbe5256..149bdd0 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.compaction.source;
import java.io.IOException;
-import java.net.URI;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -32,23 +31,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.suite.CompactionSuite;
@@ -88,6 +70,24 @@ import
org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
import org.apache.gobblin.util.request_allocation.ResourceEstimator;
import org.apache.gobblin.util.request_allocation.ResourcePool;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.gobblin.util.HadoopUtils.getSourceFileSystem;
+
/**
* A compaction source derived from {@link Source} which uses {@link
DefaultFileSystemGlobFinder} to find all
@@ -110,13 +110,8 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
@Override
public WorkUnitStream getWorkunitStream(SourceState state) {
try {
- fs = getSourceFileSystem(state);
- state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
- suite =
CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
+ initCompactionSource(state);
- initRequestAllocator(state);
- initJobDir(state);
- copyJarDependencies(state);
DatasetsFinder finder =
DatasetUtils.instantiateDatasetFinder(state.getProperties(),
getSourceFileSystem(state),
DefaultFileSystemGlobFinder.class.getName());
@@ -219,6 +214,29 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
}
}
+ /**
+ * An non-extensible init method for {@link CompactionSource}, while it
leaves
+ * extensible {@link #optionalInit(SourceState)} to derived class to adding
customized initialization.
+ *
+ * Comparing to make this method protected directly, this approach is less
error-prone since all initialization
+ * happening inside {@link #initCompactionSource(SourceState)} is compulsory.
+ */
+ private void initCompactionSource(SourceState state) throws IOException {
+ fs = getSourceFileSystem(state);
+ state.setProp(COMPACTION_INIT_TIME, DateTimeUtils.currentTimeMillis());
+ suite =
CompactionSuiteUtils.getCompactionSuiteFactory(state).createSuite(state);
+
+ initRequestAllocator(state);
+ initJobDir(state);
+ copyJarDependencies(state);
+
+ optionalInit(state);
+ }
+
+ protected void optionalInit(SourceState state) {
+ // do nothing.
+ }
+
private void initRequestAllocator (State state) {
try {
ResourceEstimator estimator =
GobblinConstructorUtils.<ResourceEstimator>invokeLongestConstructor(
@@ -437,13 +455,6 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
}
}
- public static FileSystem getSourceFileSystem(State state)
- throws IOException {
- Configuration conf = HadoopUtils.getConfFromState(state);
- String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI,
ConfigurationKeys.LOCAL_FS_URI);
- return
HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri),
conf), state);
- }
-
/**
* Create a temporary job directory based on job id or (if not available)
UUID
*/
diff --git a/travis/test-build.sh b/travis/test-build.sh
index 4604d26..caf1495 100755
--- a/travis/test-build.sh
+++ b/travis/test-build.sh
@@ -24,4 +24,4 @@
set -e
echo "Starting $0 at " $(date)
-time ./gradlew clean build -x test -Dorg.gradle.parallel=false
$GOBBLIN_GRADLE_OPTS
+time ./gradlew clean build -x test -x javadoc -Dorg.gradle.parallel=true
$GOBBLIN_GRADLE_OPTS
diff --git a/travis/test-group1.sh b/travis/test-group1.sh
index 0c02d5c..fc04830 100755
--- a/travis/test-group1.sh
+++ b/travis/test-group1.sh
@@ -32,4 +32,4 @@ echo "Precompiling tests:"
rm -rf $HOME/.gradle/caches/
./gradlew compileTest -Porg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS
echo "Running tests for $TEST_GROUP1"
-time ./gradlew -PskipTestGroup=disabledOnTravis -PrunTestGroups=$TEST_GROUP1
-Dorg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS test
+time ./gradlew -PskipTestGroup=disabledOnTravis -PrunTestGroups=$TEST_GROUP1
-Dorg.gradle.parallel=false $GOBBLIN_GRADLE_OPTS test
\ No newline at end of file