This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2afab699de [GOBBLIN-2165] Support partial commit semantic from 
CopyDataPublisher (#4066)
2afab699de is described below

commit 2afab699def9e02f99278ca748223c1bfd8901fc
Author: William Lo <lo.willia...@gmail.com>
AuthorDate: Mon Oct 7 21:22:15 2024 -0400

    [GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher 
(#4066)
    
    * Support partial commit semantic from copydatapublisher
---
 .../data/management/copy/publisher/CopyDataPublisher.java     | 11 ++++++-----
 .../java/org/apache/gobblin/runtime/SafeDatasetCommit.java    |  4 ++--
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 722f1f3fdd..ef8a0e72e8 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -25,9 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,18 +39,21 @@ import com.google.common.collect.Multimap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
 import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -358,8 +358,9 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
       if (copyEntity instanceof CopyableFile) {
         CopyableFile copyableFile = (CopyableFile) copyEntity;
-        preserveFileAttrInPublisher(copyableFile);
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
+          // Committed files should exist in destination otherwise FNFE will 
be thrown
+          preserveFileAttrInPublisher(copyableFile);
           
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, 
copyableFile, wus);
           // Dataset Output path is injected in each copyableFile.
           // This can be optimized by having a dataset level equivalent class 
for copyable entities
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index e07083ff1d..70a9d9a96e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -175,7 +175,7 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
           this.datasetState.setState(JobState.RunningState.COMMITTED);
         }
       }
-    }  catch (Throwable throwable) {
+    } catch (Throwable throwable) {
       log.error(String.format("Failed to commit dataset state for dataset %s 
of job %s", this.datasetUrn,
           this.jobContext.getJobId()), throwable);
       throw new RuntimeException(throwable);
@@ -192,7 +192,7 @@ public final class SafeDatasetCommit implements 
Callable<Void> {
         }
 
       } catch (IOException | RuntimeException ioe) {
-        log.error(String
+       log.error(String
             .format("Failed to persist dataset state for dataset %s of job 
%s", datasetUrn, this.jobContext.getJobId()),
             ioe);
         throw new RuntimeException(ioe);

Reply via email to