This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3f3c9ce Lazily initialize FileContext and do not store a handle of it
so it can be GC'ed when required (#3444)
3f3c9ce is described below
commit 3f3c9ce8b028ec4c68a992e070d48b99068e99f1
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Dec 14 09:31:48 2021 -0800
Lazily initialize FileContext and do not store a handle of it so it can be
GC'ed when required (#3444)
---
.../writer/FileAwareInputStreamDataWriter.java | 48 ++++++++++++----------
1 file changed, 27 insertions(+), 21 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 0c99088..c42d51f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -17,11 +17,6 @@
package org.apache.gobblin.data.management.copy.writer;
-import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,7 +26,25 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterators;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -62,15 +75,6 @@ import org.apache.gobblin.util.io.StreamCopier;
import org.apache.gobblin.util.io.StreamThrottler;
import org.apache.gobblin.util.io.ThrottledInputStream;
import org.apache.gobblin.writer.DataWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
/**
@@ -98,7 +102,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
protected final int bufferSize;
private final boolean checkFileSize;
private final Options.Rename renameOptions;
- private final FileContext fileContext;
+ private final URI uri;
+ private final Configuration conf;
protected final Meter copySpeedMeter;
@@ -136,14 +141,13 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
numBranches, branchId),
ConfigurationKeys.LOCAL_FS_URI);
- Configuration conf = WriterUtils.getFsConfiguration(state);
- URI uri = URI.create(uriStr);
+ this.conf = WriterUtils.getFsConfiguration(state);
+ this.uri = URI.create(uriStr);
if (fileSystem != null) {
this.fs = fileSystem;
} else {
this.fs = FileSystem.get(uri, conf);
}
- this.fileContext = FileContext.getFileContext(uri, conf);
if
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false))
{
this.stagingDir = new
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
} else {
@@ -440,14 +444,16 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
setFilePermissions(copyableFile);
Iterator<OwnerAndPermission> ancestorOwnerAndPermissionIt =
- copyableFile.getAncestorsOwnerAndPermission() == null ?
Iterators.<OwnerAndPermission>emptyIterator()
+ copyableFile.getAncestorsOwnerAndPermission() == null ?
Iterators.emptyIterator()
: copyableFile.getAncestorsOwnerAndPermission().iterator();
ensureDirectoryExists(this.fs, outputFilePath.getParent(),
ancestorOwnerAndPermissionIt);
- this.fileContext.rename(stagingFilePath, outputFilePath, renameOptions);
+ // Do not store the FileContext after doing the rename because
FileContexts are not cached and a new object
+ // is created for every task's commit
+ FileContext.getFileContext(this.uri, this.conf).rename(stagingFilePath,
outputFilePath, renameOptions);
} catch (IOException ioe) {
- log.error("Could not commit file %s.", outputFilePath);
+ log.error("Could not commit file {}.", outputFilePath);
// persist file
this.recoveryHelper.persistFile(this.state, copyableFile,
stagingFilePath);
throw ioe;