This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2afab699de [GOBBLIN-2165] Support partial commit semantic from
CopyDataPublisher (#4066)
2afab699de is described below
commit 2afab699def9e02f99278ca748223c1bfd8901fc
Author: William Lo <[email protected]>
AuthorDate: Mon Oct 7 21:22:15 2024 -0400
[GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher
(#4066)
* Support partial commit semantic from copydatapublisher
---
.../data/management/copy/publisher/CopyDataPublisher.java | 11 ++++++-----
.../java/org/apache/gobblin/runtime/SafeDatasetCommit.java | 4 ++--
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 722f1f3fdd..ef8a0e72e8 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -25,9 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,18 +39,21 @@ import com.google.common.collect.Multimap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -358,8 +358,9 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
CopyableFile copyableFile = (CopyableFile) copyEntity;
- preserveFileAttrInPublisher(copyableFile);
if (wus.getWorkingState() == WorkingState.COMMITTED) {
+ // Committed files should exist in destination otherwise FNFE will
be thrown
+ preserveFileAttrInPublisher(copyableFile);
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter,
copyableFile, wus);
// Dataset Output path is injected in each copyableFile.
// This can be optimized by having a dataset level equivalent class
for copyable entities
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index e07083ff1d..70a9d9a96e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -175,7 +175,7 @@ public final class SafeDatasetCommit implements
Callable<Void> {
this.datasetState.setState(JobState.RunningState.COMMITTED);
}
}
- } catch (Throwable throwable) {
+ } catch (Throwable throwable) {
log.error(String.format("Failed to commit dataset state for dataset %s
of job %s", this.datasetUrn,
this.jobContext.getJobId()), throwable);
throw new RuntimeException(throwable);
@@ -192,7 +192,7 @@ public final class SafeDatasetCommit implements
Callable<Void> {
}
} catch (IOException | RuntimeException ioe) {
- log.error(String
+ log.error(String
.format("Failed to persist dataset state for dataset %s of job
%s", datasetUrn, this.jobContext.getJobId()),
ioe);
throw new RuntimeException(ioe);