Repository: incubator-gobblin
Updated Branches:
refs/heads/master 6b616d472 -> 344d6d3c6
The following changes were added:
- Refactored RetryWriter to gobblin-core to be able use in WriterUtils and
not having circular dependencies.
- Introducing mkdirsWithRecursivePermissionWithRetry to be able set retry
if directory does not exists right after creation on eventual consistent fs.
- Adding retry to publisher (data.publisher.retry.enabled=true) like
TimestampDataPublisher, TimePartitinedDataPublisher to support eventual
consisteny filesystem targets
- Tmp fs can be specified with compaction.tmp.fs in compaction job to be
able use hdfs for tmp fs and store result on S3. Earlier it was not possible to
use differnet fs for tmp and target.
- Retry can be set for compaction if you don't want to fail right away if
directory fails to show up right away which can happen on eventual consistent
fs (compaction.retry.enabled=true)
- Adding dataset name for compaction mr job name which makes significantly
easier to identify which compaction job belongs to which dataset.
- Some minor modification to support non avro extensions
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0e870956
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0e870956
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0e870956
Branch: refs/heads/master
Commit: 0e870956a01dc5cd62be4fa4977fa7f08bd90c8e
Parents: 280b1d3
Author: treff7es <[email protected]>
Authored: Wed Aug 23 16:09:26 2017 +0200
Committer: treff7es <[email protected]>
Committed: Wed Aug 23 16:09:26 2017 +0200
----------------------------------------------------------------------
.../gobblin/util/RecordCountProvider.java | 2 +-
.../compaction/mapreduce/MRCompactor.java | 4 +
.../mapreduce/MRCompactorJobRunner.java | 112 +++++++++++++++---
.../gobblin/writer/AsyncWriterManager.java | 4 +-
.../writer/exception/NonTransientException.java | 40 -------
gobblin-core/build.gradle | 1 -
.../gobblin/publisher/BaseDataPublisher.java | 60 ++++++++--
.../publisher/TimePartitionedDataPublisher.java | 4 +-
.../publisher/TimestampDataPublisher.java | 8 +-
.../apache/gobblin/retry/RetryerFactory.java | 116 -------------------
.../org/apache/gobblin/writer/RetryWriter.java | 2 +-
.../writer/http/SalesforceRestWriter.java | 7 +-
.../apache/gobblin/writer/RetryWriterTest.java | 10 +-
.../GoogleAnalyticsUnsampledExtractor.java | 15 +--
.../GoogleAnalyticsUnsampledExtractorTest.java | 12 +-
gobblin-utility/build.gradle | 1 +
.../exception/NonTransientException.java | 40 +++++++
.../org/apache/gobblin/util/WriterUtils.java | 35 +++++-
.../CompactionRecordCountProvider.java | 18 ++-
.../gobblin/util/retry/RetryerFactory.java | 116 +++++++++++++++++++
20 files changed, 384 insertions(+), 223 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-api/src/main/java/org/apache/gobblin/util/RecordCountProvider.java
----------------------------------------------------------------------
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/util/RecordCountProvider.java
b/gobblin-api/src/main/java/org/apache/gobblin/util/RecordCountProvider.java
index cb912b7..87a5429 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/util/RecordCountProvider.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/RecordCountProvider.java
@@ -37,7 +37,7 @@ public abstract class RecordCountProvider {
* Convert a {@link Path} from another {@link RecordCountProvider} so that
it can be used
* in {@link #getRecordCount(Path)} of this {@link RecordCountProvider}.
*/
- public Path convertPath(Path path, RecordCountProvider src) {
+ public Path convertPath(Path path, String extension, RecordCountProvider
src) {
if (this.getClass().equals(src.getClass())) {
return path;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
----------------------------------------------------------------------
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
index ad695c5..9e53ef4 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java
@@ -25,6 +25,7 @@ import static
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Statu
import static
org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,6 +257,8 @@ public class MRCompactor implements Compactor {
public static final String COMPACTION_TRACKING_EVENTS_NAMESPACE =
COMPACTION_PREFIX + "tracking.events";
public static final String COMPACTION_INPUT_PATH_TIME = COMPACTION_PREFIX +
"input.path.time";
+ public static final String COMPACTION_FILE_EXTENSION =
+ COMPACTION_PREFIX + "extension";
private static final long COMPACTION_JOB_WAIT_INTERVAL_SECONDS = 10;
private static final Map<Dataset, Job> RUNNING_MR_JOBS =
Maps.newConcurrentMap();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 491bc81..e436e4d 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -18,13 +18,16 @@
package org.apache.gobblin.compaction.mapreduce;
import java.io.IOException;
+import java.net.URI;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.math3.primes.Primes;
@@ -41,23 +44,28 @@ import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.Retryer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -68,8 +76,10 @@ import org.apache.gobblin.util.RecordCountProvider;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
import org.apache.gobblin.util.recordcount.LateFileRecordCountProvider;
+import org.apache.gobblin.util.retry.RetryerFactory;
-
+import static
org.apache.gobblin.compaction.mapreduce.MRCompactor.COMPACTION_PREFIX;
+import static org.apache.gobblin.util.retry.RetryerFactory.*;
/**
* This class is responsible for configuring and running a single MR job.
@@ -116,6 +126,8 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
public static final String HADOOP_JOB_NAME = "Gobblin MR Compaction";
private static final long MR_JOB_CHECK_COMPLETE_INTERVAL_MS = 5000;
+ private final boolean isRetryEnabled;
+ private final String tmpFsUri;
public enum Policy {
@@ -137,6 +149,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
protected final Dataset dataset;
protected final FileSystem fs;
+ protected final FileSystem tmpFs;
protected final FsPermission perm;
protected final boolean shouldDeduplicate;
protected final boolean outputDeduplicated;
@@ -151,11 +164,31 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
private final LateFileRecordCountProvider lateOutputRecordCountProvider;
private final DatasetHelper datasetHelper;
private final int copyLateDataThreadPoolSize;
+ private final String outputExtension;
private volatile Policy policy = Policy.DO_NOT_PUBLISH_DATA;
private volatile Status status = Status.RUNNING;
private final Cache<Path, List<Path>> applicablePathCache;
+ static final String COMPACTION_RETRY_PREFIX = COMPACTION_JOB_PREFIX +
"retry.";
+ static final String COMPACTION_RETRY_ENABLED = COMPACTION_RETRY_PREFIX +
"enabled";
+ static final String COMPACTION_TMP_FS = COMPACTION_PREFIX + "tmp.fs";
+
+ static final Config COMPACTION_RETRY_DEFAULTS;
+
+ static {
+ Map<String, Object> configMap =
+ ImmutableMap.<String, Object>builder()
+ .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L)) //Overall
retry for 2 minutes
+ .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to
retry 5 seconds
+ .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+ .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
+ .build();
+ COMPACTION_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
+ };
+
+ protected final Config retrierConfig;
+
protected MRCompactorJobRunner(Dataset dataset, FileSystem fs) {
this.dataset = dataset;
this.fs = fs;
@@ -185,6 +218,20 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
this.copyLateDataThreadPoolSize =
this.dataset.jobProps().getPropAsInt(COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE,
DEFAULT_COMPACTION_COPY_LATE_DATA_THREAD_POOL_SIZE);
+ this.tmpFsUri = this.dataset.jobProps().getProp(COMPACTION_TMP_FS,
+ null);
+
+ try {
+ Log.info("Tmp fs uri:"+this.tmpFsUri);
+ if (this.tmpFsUri != null) {
+ this.tmpFs = FileSystem.get(new URI(this.tmpFsUri), new
Configuration());
+ } else {
+ this.tmpFs = MRCompactorJobRunner.this.fs;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed get Filesystem from tmp fs uri", e);
+ }
+
try {
this.inputRecordCountProvider = (RecordCountProvider) Class
.forName(this.dataset.jobProps().getProp(MRCompactor.COMPACTION_INPUT_RECORD_COUNT_PROVIDER,
@@ -196,6 +243,8 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
.newInstance();
this.lateInputRecordCountProvider = new
LateFileRecordCountProvider(this.inputRecordCountProvider);
this.lateOutputRecordCountProvider = new
LateFileRecordCountProvider(this.outputRecordCountProvider);
+ this.isRetryEnabled=
this.dataset.jobProps().getPropAsBoolean(COMPACTION_RETRY_ENABLED,
+ false);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate RecordCountProvider",
e);
}
@@ -203,6 +252,20 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
this.applicablePathCache =
CacheBuilder.newBuilder().maximumSize(2000).build();
this.datasetHelper = new DatasetHelper(this.dataset, this.fs,
this.getApplicableFileExtensions());
+ this.outputExtension =
this.dataset.jobProps().getProp(MRCompactor.COMPACTION_FILE_EXTENSION, ".avro");
+
+ if (this.isRetryEnabled) {
+ this.retrierConfig = ConfigBuilder.create()
+ .loadProps(this.dataset.jobProps().getProperties(),
COMPACTION_RETRY_PREFIX)
+ .build()
+ .withFallback(COMPACTION_RETRY_DEFAULTS);
+
+ LOG.info("Retry enabled for compaction publish :"+
retrierConfig.root().render(ConfigRenderOptions.concise()));
+ } else {
+ this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
+ LOG.info("Retry disabled for compaction");
+ }
+
}
@Override
@@ -325,6 +388,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
public Void call() throws Exception {
Path convertedFilePath =
MRCompactorJobRunner.this.outputRecordCountProvider.convertPath(
LateFileRecordCountProvider.restoreFilePath(filePath),
+ MRCompactorJobRunner.this.outputExtension,
MRCompactorJobRunner.this.inputRecordCountProvider);
String targetFileName = convertedFilePath.getName();
Path outPath =
MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(targetFileName,
@@ -364,7 +428,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
}
protected void configureJob(Job job) throws IOException {
- job.setJobName(HADOOP_JOB_NAME);
+ job.setJobName(HADOOP_JOB_NAME + " (" + this.dataset.getDatasetName() +
")");
configureInputAndOutputPaths(job);
configureMapper(job);
configureReducer(job);
@@ -514,29 +578,41 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
}
private void moveTmpPathToOutputPath() throws IOException {
+ Retryer<Void> retryer = RetryerFactory.newInstance(this.retrierConfig);
+
LOG.info(String.format("Moving %s to %s", this.dataset.outputTmpPath(),
this.dataset.outputPath()));
this.fs.delete(this.dataset.outputPath(), true);
- WriterUtils.mkdirsWithRecursivePermission(this.fs,
this.dataset.outputPath().getParent(), this.perm);
- if (!this.fs.rename(this.dataset.outputTmpPath(),
this.dataset.outputPath())) {
- throw new IOException(
- String.format("Unable to move %s to %s",
this.dataset.outputTmpPath(), this.dataset.outputPath()));
+ if (this.isRetryEnabled) {
+ try {
+ retryer.call(() -> {
+ if (fs.exists(this.dataset.outputPath())) {
+ throw new IOException("Path " + this.dataset.outputPath() + "
exists however it should not. Will wait more.");
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
+
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(MRCompactorJobRunner.this.fs,
this.dataset.outputPath().getParent(), this.perm, this.retrierConfig);
+
+ Log.info("Moving from fs: ("+MRCompactorJobRunner.this.tmpFs.getUri()+")
path: "+ this.dataset.outputTmpPath() + " to "+ "fs: ("+
FileSystem.get(this.dataset.outputPath().getParent().toUri(),
this.fs.getConf()).getUri()+") output path: " + this.dataset.outputPath());
+ HadoopUtils.movePath (MRCompactorJobRunner.this.tmpFs,
this.dataset.outputTmpPath(),
FileSystem.get(this.dataset.outputPath().getParent().toUri(),
this.fs.getConf()), this.dataset.outputPath(), false, this.fs.getConf()) ;
}
private void addFilesInTmpPathToOutputPath () throws IOException {
- List<Path> paths =
this.getApplicableFilePaths(this.dataset.outputTmpPath());
+ List<Path> paths =
this.getApplicableFilePaths(this.dataset.outputTmpPath(), this.tmpFs);
for (Path path: paths) {
String fileName = path.getName();
LOG.info(String.format("Adding %s to %s", path.toString(),
this.dataset.outputPath()));
Path outPath =
MRCompactorJobRunner.this.lateOutputRecordCountProvider.constructLateFilePath(fileName,
MRCompactorJobRunner.this.fs, this.dataset.outputPath());
- if (!this.fs.rename(path, outPath)) {
- throw new IOException(
- String.format("Unable to move %s to %s", path.toString(),
outPath.toString()));
- }
+ HadoopUtils.movePath(MRCompactorJobRunner.this.tmpFs, path,
+ FileSystem.get(this.dataset.outputPath().getParent().toUri(),
this.fs.getConf()), outPath, false, this.fs.getConf());
}
}
@@ -575,7 +651,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
* Get the list of file {@link Path}s in the given dataDir, which satisfy
the extension requirements
* of {@link #getApplicableFileExtensions()}.
*/
- private List<Path> getApplicableFilePaths(final Path dataDir) throws
IOException {
+ private List<Path> getApplicableFilePaths(final Path dataDir, final
FileSystem fs) throws IOException {
try {
return applicablePathCache.get(dataDir, new Callable<List<Path>>() {
@@ -585,7 +661,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
return Lists.newArrayList();
}
List<Path> paths = Lists.newArrayList();
- for (FileStatus fileStatus :
FileListUtils.listFilesRecursively(MRCompactorJobRunner.this.fs, dataDir,
+ for (FileStatus fileStatus : FileListUtils.listFilesRecursively(fs,
dataDir,
new PathFilter() {
@Override
public boolean accept(Path path) {
@@ -619,15 +695,15 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
.additionalMetadata(
CompactionSlaEventHelper.LATE_RECORD_COUNT,
Long.toString(this.lateOutputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset
- .outputLatePath()))))
+ .outputLatePath(), this.fs))))
.additionalMetadata(
CompactionSlaEventHelper.REGULAR_RECORD_COUNT,
Long.toString(this.outputRecordCountProvider.getRecordCount(this.getApplicableFilePaths(this.dataset
- .outputPath()))))
+ .outputPath(), this.fs))))
.additionalMetadata(CompactionSlaEventHelper.RECOMPATED_METADATA_NAME,
Boolean.toString(this.dataset.needToRecompact())).build().submit();
} catch (Throwable e) {
- LOG.warn("Failed to submit compcation completed event:" + e, e);
+ LOG.warn("Failed to submit compaction completed event:" + e, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
----------------------------------------------------------------------
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index ac30c7b..4fcd508 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -43,12 +43,12 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import javax.annotation.Nonnull;
-
import lombok.Getter;
import lombok.Setter;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
@@ -60,8 +60,6 @@ import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.FinalState;
-import org.apache.gobblin.writer.exception.NonTransientException;
-
/**
* A Data Writer to use as a base for writing async writers.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core-base/src/main/java/org/apache/gobblin/writer/exception/NonTransientException.java
----------------------------------------------------------------------
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/exception/NonTransientException.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/exception/NonTransientException.java
deleted file mode 100644
index 26e4a0b..0000000
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/exception/NonTransientException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.writer.exception;
-
-/**
- * NonTransientException that shows this is a permanent failure where retry
cannot solve.
- */
-public class NonTransientException extends RuntimeException {
- private static final long serialVersionUID = -973030180704599529L;
-
- public NonTransientException() {
- super();
- }
-
- public NonTransientException(String message) {
- super(message);
- }
-
- public NonTransientException(String message, Throwable t) {
- super(message, t);
- }
-
- public NonTransientException(Throwable t) {
- super(t);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle
index c437e66..ee7a77c 100644
--- a/gobblin-core/build.gradle
+++ b/gobblin-core/build.gradle
@@ -53,7 +53,6 @@ dependencies {
compile externalDependency.scala
compile externalDependency.lombok
compile externalDependency.typesafeConfig
- compile externalDependency.guavaretrying
compile externalDependency.findBugsAnnotations
compile externalDependency.oltu
compile externalDependency.opencsv
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 19314e5..9bdcbdd 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -28,9 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
-import org.apache.gobblin.lineage.LineageInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -43,14 +43,20 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.lineage.LineageInfo;
import org.apache.gobblin.metadata.MetadataMerger;
import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
import org.apache.gobblin.util.ForkOperatorUtils;
@@ -62,6 +68,7 @@ import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
+import static org.apache.gobblin.util.retry.RetryerFactory.*;
/**
* A basic implementation of {@link SingleTaskDataPublisher} that publishes
the data from the writer output directory
@@ -105,6 +112,25 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
* for aggregating this information from all workunits so it can be
published.
*/
protected final Map<PartitionIdentifier, MetadataMerger<String>>
metadataMergers;
+ protected final boolean shouldRetry;
+
+ static final String DATA_PUBLISHER_RETRY_PREFIX =
ConfigurationKeys.DATA_PUBLISHER_PREFIX + ".retry.";
+ static final String PUBLISH_RETRY_ENABLED = DATA_PUBLISHER_RETRY_PREFIX +
"enabled";
+
+ static final Config PUBLISH_RETRY_DEFAULTS;
+ protected final Config retrierConfig;
+
+ static {
+ Map<String, Object> configMap =
+ ImmutableMap.<String, Object>builder()
+ .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L)) //Overall
retry for 2 minutes
+ .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to
retry 5 seconds
+ .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+ .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
+ .build();
+ PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
+ };
+
public BaseDataPublisher(State state)
throws IOException {
@@ -118,6 +144,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
}
this.numBranches =
this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
+ this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED,
false);
this.writerFileSystemByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
this.publisherFileSystemByBranches =
Lists.newArrayListWithCapacity(this.numBranches);
@@ -151,6 +178,19 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
FsPermission.getDefault().toShort(),
ConfigurationKeys.PERMISSION_PARSING_RADIX)));
}
+ if (this.shouldRetry) {
+ this.retrierConfig = ConfigBuilder.create()
+ .loadProps(this.getState().getProperties(),
DATA_PUBLISHER_RETRY_PREFIX)
+ .build()
+ .withFallback(PUBLISH_RETRY_DEFAULTS);
+ LOG.info("Retry enabled for publish with config : "+
retrierConfig.root().render(ConfigRenderOptions.concise()));
+
+ }else {
+ LOG.info("Retry disabled for publish.");
+ this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
+ }
+
+
this.parallelRunnerThreads =
state.getPropAsInt(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY,
ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS);
this.parallelRunnerCloser = Closer.create();
@@ -312,8 +352,8 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
if (publishSingleTaskData) {
// Create final output directory
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
- this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
+ this.permissions.get(branchId), retrierConfig);
addSingleTaskWriterOutputToExistingDir(writerOutputDir,
publisherOutputDir, state, branchId, parallelRunner);
} else {
if (writerOutputPathsMoved.contains(writerOutputDir)) {
@@ -341,8 +381,8 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
this.publisherFileSystemByBranches.get(branchId).delete(publisherOutputDir,
true);
} else {
// Create the parent directory of the final output directory if it
does not exist
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputDir.getParent(), this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+ publisherOutputDir.getParent(), this.permissions.get(branchId),
retrierConfig);
}
movePath(parallelRunner, state, writerOutputDir, publisherOutputDir,
branchId);
@@ -392,8 +432,8 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
String pathSuffix = taskOutputFile
.substring(taskOutputFile.indexOf(writerOutputDir.toString()) +
writerOutputDir.toString().length() + 1);
Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix);
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputPath.getParent(), this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+ publisherOutputPath.getParent(), this.permissions.get(branchId),
retrierConfig);
movePath(parallelRunner, workUnitState, taskOutputPath,
publisherOutputPath, branchId);
}
@@ -588,9 +628,9 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
FileSystem fs = this.metaDataWriterFileSystemByBranches.get(branchId);
- if (!fs.exists(metadataOutputPath.getParent())) {
- WriterUtils.mkdirsWithRecursivePermission(fs, metadataOutputPath,
this.permissions.get(branchId));
- }
+ if (!fs.exists(metadataOutputPath.getParent())) {
+ WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs,
metadataOutputPath, this.permissions.get(branchId), retrierConfig);
+ }
//Delete the file if metadata already exists
if (fs.exists(metadataOutputPath)) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index dbe7539..30b77ea 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -59,8 +59,8 @@ public class TimePartitionedDataPublisher extends
BaseDataPublisher {
filePathStr.substring(filePathStr.indexOf(writerOutput.toString()) +
writerOutput.toString().length() + 1);
Path outputPath = new Path(publisherOutput, pathSuffix);
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
outputPath.getParent(),
- this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
outputPath.getParent(),
+ this.permissions.get(branchId), this.retrierConfig);
movePath(parallelRunner, workUnitState, status.getPath(), outputPath,
branchId);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
index 89ecdf6..36bcd49 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
@@ -52,8 +52,8 @@ public class TimestampDataPublisher extends BaseDataPublisher
{
Set<Path> writerOutputPathsMoved) throws IOException {
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
if
(!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputDir, this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+ publisherOutputDir, this.permissions.get(branchId),
this.retrierConfig);
}
super.publishData(state, branchId, publishSingleTaskData,
writerOutputPathsMoved);
}
@@ -74,8 +74,8 @@ public class TimestampDataPublisher extends BaseDataPublisher
{
Path newDst = new Path(new Path(outputDir, getDbTableName(schemaName)),
timestamp);
if (!this.publisherFileSystemByBranches.get(branchId).exists(newDst)) {
-
WriterUtils.mkdirsWithRecursivePermission(this.publisherFileSystemByBranches.get(branchId),
- newDst.getParent(), this.permissions.get(branchId));
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+ newDst.getParent(), this.permissions.get(branchId),
this.retrierConfig);
}
super.movePath(parallelRunner, state, src, newDst, branchId);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/retry/RetryerFactory.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/retry/RetryerFactory.java
b/gobblin-core/src/main/java/org/apache/gobblin/retry/RetryerFactory.java
deleted file mode 100644
index 0c41d66..0000000
--- a/gobblin-core/src/main/java/org/apache/gobblin/retry/RetryerFactory.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.retry;
-
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.writer.exception.NonTransientException;
-
-/**
- * Factory class that builds Retryer.
- * It's recommended to use with ConfigBuilder so that with State and with
prefix of the config key,
- * user can easily instantiate Retryer.
- *
- * @see GoogleAnalyticsUnsampledExtractor for some examples.
- *
- * @param <T>
- */
-public class RetryerFactory<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(RetryerFactory.class);
- public static final String RETRY_MULTIPLIER = "multiplier";
- public static final String RETRY_INTERVAL_MS = "interval_ms";
- public static final String RETRY_TIME_OUT_MS = "time_out_ms";
- public static final String RETRY_TYPE = "retry_type";
-
- private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
- private static final Config DEFAULTS;
- static {
- RETRY_EXCEPTION_PREDICATE = new Predicate<Throwable>() {
- @Override
- public boolean apply(Throwable t) {
- return !(t instanceof NonTransientException);
- }
- };
-
- Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
- .put(RETRY_TIME_OUT_MS,
TimeUnit.MINUTES.toMillis(5L))
- .put(RETRY_INTERVAL_MS,
TimeUnit.SECONDS.toMillis(30L))
- .put(RETRY_MULTIPLIER, 2L)
- .put(RETRY_TYPE,
RetryType.EXPONENTIAL.name())
- .build();
- DEFAULTS = ConfigFactory.parseMap(configMap);
- }
-
- public static enum RetryType {
- EXPONENTIAL,
- FIXED;
- }
-
- /**
- * Creates new instance of retryer based on the config.
- * Accepted config keys are defined in RetryerFactory as static member
variable.
- * You can use State along with ConfigBuilder and config prefix to build
config.
- *
- * @param config
- * @return
- */
- public static <T> Retryer<T> newInstance(Config config) {
- config = config.withFallback(DEFAULTS);
- RetryType type =
RetryType.valueOf(config.getString(RETRY_TYPE).toUpperCase());
-
- switch (type) {
- case EXPONENTIAL:
- return newExponentialRetryer(config);
- case FIXED:
- return newFixedRetryer(config);
- default:
- throw new IllegalArgumentException(type + " is not supported");
- }
- }
-
- private static <T> Retryer<T> newFixedRetryer(Config config) {
- return RetryerBuilder.<T> newBuilder()
- .retryIfException(RETRY_EXCEPTION_PREDICATE)
-
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
- .build();
- }
-
- private static <T> Retryer<T> newExponentialRetryer(Config config) {
-
- return RetryerBuilder.<T> newBuilder()
- .retryIfException(RETRY_EXCEPTION_PREDICATE)
-
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
-
config.getLong(RETRY_INTERVAL_MS),
-
TimeUnit.MILLISECONDS))
-
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
- .build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
index 2c04003..f19c30c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/RetryWriter.java
@@ -37,12 +37,12 @@ import com.google.common.base.Predicate;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.FinalState;
-import org.apache.gobblin.writer.exception.NonTransientException;
/**
* Retry writer follows decorator pattern that retries on inner writer's
failure.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/main/java/org/apache/gobblin/writer/http/SalesforceRestWriter.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/http/SalesforceRestWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/http/SalesforceRestWriter.java
index 43b7cae..5e001d3 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/http/SalesforceRestWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/http/SalesforceRestWriter.java
@@ -20,9 +20,11 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.converter.http.RestEntry;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
@@ -47,9 +49,6 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import org.apache.gobblin.converter.http.RestEntry;
-import org.apache.gobblin.writer.exception.NonTransientException;
-
/**
* Writes to Salesforce via RESTful API, supporting INSERT_ONLY_NOT_EXIST, and
UPSERT.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
----------------------------------------------------------------------
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
index e0d6883..8d58070 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/RetryWriterTest.java
@@ -16,17 +16,17 @@
*/
package org.apache.gobblin.writer;
-import static org.mockito.Mockito.*;
-
import java.io.IOException;
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.writer.exception.NonTransientException;
import org.apache.gobblin.util.FinalState;
-import org.junit.Assert;
-import org.testng.annotations.Test;
+import static org.mockito.Mockito.*;
@Test(groups = { "gobblin.writer" })
public class RetryWriterTest {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractor.java
----------------------------------------------------------------------
diff --git
a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractor.java
b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractor.java
index f65f74c..ee7a78f 100644
---
a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractor.java
+++
b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractor.java
@@ -35,8 +35,8 @@ import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.services.analytics.Analytics;
-import com.google.api.services.analytics.model.UnsampledReport;
import
com.google.api.services.analytics.Analytics.Management.UnsampledReports.Insert;
+import com.google.api.services.analytics.model.UnsampledReport;
import com.google.api.services.drive.Drive;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
@@ -47,22 +47,23 @@ import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import static org.apache.gobblin.retry.RetryerFactory.*;
-import static org.apache.gobblin.configuration.ConfigurationKeys.*;
-import static
org.apache.gobblin.source.extractor.extract.google.GoogleCommonKeys.*;
-import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledSource.*;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.retry.RetryerFactory;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.filebased.CsvFileDownloader;
import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.writer.exception.NonTransientException;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledSource.*;
+import static
org.apache.gobblin.source.extractor.extract.google.GoogleCommonKeys.*;
+import static org.apache.gobblin.util.retry.RetryerFactory.*;
/**
* Extracts Google Analytics(GA) unsampled report data.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractorTest.java
----------------------------------------------------------------------
diff --git
a/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractorTest.java
b/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractorTest.java
index 6e6dca0..2d6a6c7 100644
---
a/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractorTest.java
+++
b/gobblin-modules/google-ingestion/src/test/java/org/apache/gobblin/source/extractor/extract/google/GoogleAnalyticsUnsampledExtractorTest.java
@@ -34,11 +34,17 @@ import
com.google.api.services.analytics.Analytics.Management.UnsampledReports.G
import com.google.api.services.analytics.model.UnsampledReport;
import
com.google.api.services.analytics.model.UnsampledReport.DriveDownloadDetails;
-import static org.apache.gobblin.retry.RetryerFactory.*;
-import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledExtractor.*;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.source.extractor.Extractor;
-import org.apache.gobblin.writer.exception.NonTransientException;
+
+import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledExtractor.DOWNLOAD_TYPE_GOOGLE_DRIVE;
+import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledExtractor.POLL_RETRY_PREFIX;
+import static
org.apache.gobblin.source.extractor.extract.google.GoogleAnalyticsUnsampledExtractor.ReportCreationStatus;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
+import static org.mockito.Mockito.*;
+
@Test(groups = { "gobblin.source.extractor.google" })
public class GoogleAnalyticsUnsampledExtractorTest {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-utility/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 003729f..cd79bde 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -35,6 +35,7 @@ dependencies {
compile externalDependency.jasypt
compile externalDependency.lombok
compile externalDependency.metricsCore
+ compile externalDependency.guavaretrying
compile externalDependency.guice
compile externalDependency.typesafeConfig
compile externalDependency.commonsPool
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-utility/src/main/java/org/apache/gobblin/exception/NonTransientException.java
----------------------------------------------------------------------
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/exception/NonTransientException.java
b/gobblin-utility/src/main/java/org/apache/gobblin/exception/NonTransientException.java
new file mode 100644
index 0000000..f2b22cd
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/exception/NonTransientException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.exception;
+
+/**
+ * NonTransientException that shows this is a permanent failure where retry
cannot solve.
+ */
+public class NonTransientException extends RuntimeException {
+ private static final long serialVersionUID = -973030180704599529L;
+
+ public NonTransientException() {
+ super();
+ }
+
+ public NonTransientException(String message) {
+ super(message);
+ }
+
+ public NonTransientException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public NonTransientException(Throwable t) {
+ super(t);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
----------------------------------------------------------------------
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index 2ac292f..f174979 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -28,10 +28,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import com.github.rholder.retry.Retryer;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
@@ -39,6 +43,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.retry.RetryerFactory;
/**
@@ -46,9 +51,12 @@ import org.apache.gobblin.source.workunit.WorkUnit;
*/
@Slf4j
public class WriterUtils {
+ private static final Logger LOG = Logger.getLogger(WriterUtils.class);
public static final String WRITER_ENCRYPTED_CONFIG_PATH =
ConfigurationKeys.WRITER_PREFIX + ".encrypted";
+ public static final Config NO_RETRY_CONFIG = ConfigFactory.empty();
+
/**
* TABLENAME should be used for jobs that pull from multiple tables/topics
and intend to write the records
* in each table/topic to a separate folder. Otherwise, DEFAULT can be used.
@@ -247,17 +255,40 @@ public class WriterUtils {
* @param perm The permission to be set
* @throws IOException if failing to create dir or set permission.
*/
- public static void mkdirsWithRecursivePermission(FileSystem fs, Path path,
FsPermission perm) throws IOException {
+ public static void mkdirsWithRecursivePermission(final FileSystem fs, final
Path path, FsPermission perm) throws IOException {
+ mkdirsWithRecursivePermissionWithRetry(fs, path, perm, NO_RETRY_CONFIG);
+ }
+
+ public static void mkdirsWithRecursivePermissionWithRetry(final FileSystem
fs, final Path path, FsPermission perm, Config retrierConfig) throws
IOException {
+
if (fs.exists(path)) {
return;
}
+
if (path.getParent() != null && !fs.exists(path.getParent())) {
- mkdirsWithRecursivePermission(fs, path.getParent(), perm);
+ mkdirsWithRecursivePermissionWithRetry(fs, path.getParent(), perm,
retrierConfig);
}
+
if (!fs.mkdirs(path, perm)) {
throw new IOException(String.format("Unable to mkdir %s with permission
%s", path, perm));
}
+ if (retrierConfig != NO_RETRY_CONFIG) {
+ //Wait until file is not there as it can happen the file fail to exist
right away on eventual consistent fs like Amazon S3
+ Retryer<Void> retryer = RetryerFactory.newInstance(retrierConfig);
+
+ try {
+ retryer.call(() -> {
+ if (!fs.exists(path)) {
+ throw new IOException("Path " + path + " does not exist however it
should. Will wait more.");
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ throw new IOException("Path " + path + "does not exist however it
should. Giving up..."+ e);
+ }
+ }
+
// Double check permission, since fs.mkdirs() may not guarantee to set the
permission correctly
if (!fs.getFileStatus(path).getPermission().equals(perm)) {
fs.setPermission(path, perm);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
----------------------------------------------------------------------
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
index f1b86ee..995a913 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/CompactionRecordCountProvider.java
@@ -20,12 +20,11 @@ package org.apache.gobblin.util.recordcount;
import java.util.Random;
import java.util.regex.Pattern;
+import org.apache.gobblin.util.RecordCountProvider;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
-import org.apache.gobblin.util.RecordCountProvider;
-
/**
* Implementation of {@link RecordCountProvider}, which provides record count
from file path.
@@ -46,6 +45,13 @@ public class CompactionRecordCountProvider extends
RecordCountProvider {
* Construct the file name as
{filenamePrefix}{recordCount}.{SystemCurrentTimeInMills}.{RandomInteger}{SUFFIX}.
*/
public static String constructFileName(String filenamePrefix, long
recordCount) {
+ return constructFileName(filenamePrefix, SUFFIX, recordCount);
+ }
+
+ /**
+ * Construct the file name as
{filenamePrefix}{recordCount}.{SystemCurrentTimeInMills}.{RandomInteger}{extension}.
+ */
+ public static String constructFileName(String filenamePrefix, String
extension, long recordCount) {
Preconditions.checkArgument(
filenamePrefix.equals(M_OUTPUT_FILE_PREFIX) ||
filenamePrefix.equals(MR_OUTPUT_FILE_PREFIX),
String.format("%s is not a supported prefix, which should be %s, or
%s.", filenamePrefix, M_OUTPUT_FILE_PREFIX,
@@ -57,10 +63,11 @@ public class CompactionRecordCountProvider extends
RecordCountProvider {
sb.append(Long.toString(System.currentTimeMillis()));
sb.append(SEPARATOR);
sb.append(Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE)));
- sb.append(SUFFIX);
+ sb.append(extension);
return sb.toString();
}
+
/**
* Get the record count through filename.
*/
@@ -81,12 +88,11 @@ public class CompactionRecordCountProvider extends
RecordCountProvider {
* This method currently supports converting the given {@link Path} from
{@link IngestionRecordCountProvider}.
* The converted {@link Path} will start with {@link #M_OUTPUT_FILE_PREFIX}.
*/
- @Override
- public Path convertPath(Path path, RecordCountProvider src) {
+ public Path convertPath(Path path, String extension, RecordCountProvider
src) {
if (this.getClass().equals(src.getClass())) {
return path;
} else if (src.getClass().equals(IngestionRecordCountProvider.class)) {
- String newFileName = constructFileName(M_OUTPUT_FILE_PREFIX,
src.getRecordCount(path));
+ String newFileName = constructFileName(M_OUTPUT_FILE_PREFIX, extension,
src.getRecordCount(path));
return new Path(path.getParent(), newFileName);
} else {
throw getNotImplementedException(src);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0e870956/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
----------------------------------------------------------------------
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
new file mode 100644
index 0000000..e793bdd
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.retry;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.exception.NonTransientException;
+
+/**
+ * Factory class that builds Retryer.
+ * It's recommended to use with ConfigBuilder so that with State and with
prefix of the config key,
+ * user can easily instantiate Retryer.
+ *
+ * @see GoogleAnalyticsUnsampledExtractor for some examples.
+ *
+ * @param <T>
+ */
+public class RetryerFactory<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RetryerFactory.class);
+ public static final String RETRY_MULTIPLIER = "multiplier";
+ public static final String RETRY_INTERVAL_MS = "interval_ms";
+ public static final String RETRY_TIME_OUT_MS = "time_out_ms";
+ public static final String RETRY_TYPE = "retry_type";
+
+ private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
+ private static final Config DEFAULTS;
+ static {
+ RETRY_EXCEPTION_PREDICATE = new Predicate<Throwable>() {
+ @Override
+ public boolean apply(Throwable t) {
+ return !(t instanceof NonTransientException);
+ }
+ };
+
+ Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
+ .put(RETRY_TIME_OUT_MS,
TimeUnit.MINUTES.toMillis(5L))
+ .put(RETRY_INTERVAL_MS,
TimeUnit.SECONDS.toMillis(30L))
+ .put(RETRY_MULTIPLIER, 2L)
+ .put(RETRY_TYPE,
RetryType.EXPONENTIAL.name())
+ .build();
+ DEFAULTS = ConfigFactory.parseMap(configMap);
+ }
+
+ public static enum RetryType {
+ EXPONENTIAL,
+ FIXED;
+ }
+
+ /**
+ * Creates new instance of retryer based on the config.
+ * Accepted config keys are defined in RetryerFactory as static member
variable.
+ * You can use State along with ConfigBuilder and config prefix to build
config.
+ *
+ * @param config
+ * @return
+ */
+ public static <T> Retryer<T> newInstance(Config config) {
+ config = config.withFallback(DEFAULTS);
+ RetryType type =
RetryType.valueOf(config.getString(RETRY_TYPE).toUpperCase());
+
+ switch (type) {
+ case EXPONENTIAL:
+ return newExponentialRetryer(config);
+ case FIXED:
+ return newFixedRetryer(config);
+ default:
+ throw new IllegalArgumentException(type + " is not supported");
+ }
+ }
+
+ private static <T> Retryer<T> newFixedRetryer(Config config) {
+ return RetryerBuilder.<T> newBuilder()
+ .retryIfException(RETRY_EXCEPTION_PREDICATE)
+
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
+ .build();
+ }
+
+ private static <T> Retryer<T> newExponentialRetryer(Config config) {
+
+ return RetryerBuilder.<T> newBuilder()
+ .retryIfException(RETRY_EXCEPTION_PREDICATE)
+
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
+
config.getLong(RETRY_INTERVAL_MS),
+
TimeUnit.MILLISECONDS))
+
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS))
+ .build();
+ }
+}