This is an automated email from the ASF dual-hosted git repository. wlo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 2afab699de [GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher (#4066) 2afab699de is described below commit 2afab699def9e02f99278ca748223c1bfd8901fc Author: William Lo <lo.willia...@gmail.com> AuthorDate: Mon Oct 7 21:22:15 2024 -0400 [GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher (#4066) * Support partial commit semantic from copydatapublisher --- .../data/management/copy/publisher/CopyDataPublisher.java | 11 ++++++----- .../java/org/apache/gobblin/runtime/SafeDatasetCommit.java | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 722f1f3fdd..ef8a0e72e8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -25,9 +25,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,18 +39,21 @@ import com.google.common.collect.Multimap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.configuration.WorkUnitState.WorkingState; +import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.CopySource; import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; import org.apache.gobblin.data.management.copy.CopyableFile; -import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.PreserveAttributes; import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; @@ -358,8 +358,9 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus); if (copyEntity instanceof CopyableFile) { CopyableFile copyableFile = (CopyableFile) copyEntity; - preserveFileAttrInPublisher(copyableFile); if (wus.getWorkingState() == WorkingState.COMMITTED) { + // Committed files should exist in destination otherwise FNFE will be thrown + preserveFileAttrInPublisher(copyableFile); CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus); // Dataset Output path is injected in each copyableFile. // This can be optimized by having a dataset level equivalent class for copyable entities diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index e07083ff1d..70a9d9a96e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -175,7 +175,7 @@ public final class SafeDatasetCommit implements Callable<Void> { this.datasetState.setState(JobState.RunningState.COMMITTED); } } - } catch (Throwable throwable) { + } catch (Throwable throwable) { log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), throwable); throw new RuntimeException(throwable); @@ -192,7 +192,7 @@ public final class SafeDatasetCommit implements Callable<Void> { } } catch (IOException | RuntimeException ioe) { - log.error(String + log.error(String .format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()), ioe); throw new RuntimeException(ioe);