This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f3c9ce  Lazily initialize FileContext and do not store a handle of it 
so it can be GC'ed when required (#3444)
3f3c9ce is described below

commit 3f3c9ce8b028ec4c68a992e070d48b99068e99f1
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Tue Dec 14 09:31:48 2021 -0800

    Lazily initialize FileContext and do not store a handle of it so it can be 
GC'ed when required (#3444)
---
 .../writer/FileAwareInputStreamDataWriter.java     | 48 ++++++++++++----------
 1 file changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 0c99088..c42d51f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -17,11 +17,6 @@
 
 package org.apache.gobblin.data.management.copy.writer;
 
-import com.codahale.metrics.Meter;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterators;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,7 +26,25 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterators;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.broker.EmptyKey;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -62,15 +75,6 @@ import org.apache.gobblin.util.io.StreamCopier;
 import org.apache.gobblin.util.io.StreamThrottler;
 import org.apache.gobblin.util.io.ThrottledInputStream;
 import org.apache.gobblin.writer.DataWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -98,7 +102,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   protected final int bufferSize;
   private final boolean checkFileSize;
   private final Options.Rename renameOptions;
-  private final FileContext fileContext;
+  private final URI uri;
+  private final Configuration conf;
 
   protected final Meter copySpeedMeter;
 
@@ -136,14 +141,13 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
         
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
 numBranches, branchId),
         ConfigurationKeys.LOCAL_FS_URI);
 
-    Configuration conf = WriterUtils.getFsConfiguration(state);
-    URI uri = URI.create(uriStr);
+    this.conf = WriterUtils.getFsConfiguration(state);
+    this.uri = URI.create(uriStr);
     if (fileSystem != null) {
       this.fs = fileSystem;
     } else {
       this.fs = FileSystem.get(uri, conf);
     }
-    this.fileContext = FileContext.getFileContext(uri, conf);
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
     } else {
@@ -440,14 +444,16 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
       setFilePermissions(copyableFile);
 
       Iterator<OwnerAndPermission> ancestorOwnerAndPermissionIt =
-          copyableFile.getAncestorsOwnerAndPermission() == null ? 
Iterators.<OwnerAndPermission>emptyIterator()
+          copyableFile.getAncestorsOwnerAndPermission() == null ? 
Iterators.emptyIterator()
               : copyableFile.getAncestorsOwnerAndPermission().iterator();
 
       ensureDirectoryExists(this.fs, outputFilePath.getParent(), 
ancestorOwnerAndPermissionIt);
 
-      this.fileContext.rename(stagingFilePath, outputFilePath, renameOptions);
+      // Do not store the FileContext after doing the rename because 
FileContexts are not cached and a new object
+      // is created for every task's commit
+      FileContext.getFileContext(this.uri, this.conf).rename(stagingFilePath, 
outputFilePath, renameOptions);
     } catch (IOException ioe) {
-      log.error("Could not commit file %s.", outputFilePath);
+      log.error("Could not commit file {}.", outputFilePath);
       // persist file
       this.recoveryHelper.persistFile(this.state, copyableFile, 
stagingFilePath);
       throw ioe;

Reply via email to