This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 00c60ca64 [GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing
post-publish WUs before they've actually run (#3844)
00c60ca64 is described below
commit 00c60ca6492644652c66905eb384ab145e348bbf
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Dec 7 11:40:27 2023 -0800
[GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing post-publish WUs
before they've actually run (#3844)
* Fix `CopyDataPublisher` to avoid committing post-publish WUs before
they've actually run
* fixup findbugsMain
---
.../copy/publisher/CopyDataPublisher.java | 164 ++++++++++++++-------
1 file changed, 107 insertions(+), 57 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 2c282bf36..56eb78e4f 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -20,17 +20,14 @@ package org.apache.gobblin.data.management.copy.publisher;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -56,6 +53,8 @@ import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -71,8 +70,10 @@ import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.publisher.UnpublishedHandling;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
/**
@@ -222,6 +223,80 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
}
}
+ /** Organizes and encapsulates access to {@link WorkUnitState}s according to
useful access patterns. */
+ @AllArgsConstructor
+ private static class WorkUnitStatesHelper {
+ private final Collection<WorkUnitState> workUnitStates;
+
+ public boolean isEmpty() {
+ return workUnitStates.isEmpty();
+ }
+
+ public WorkUnitState getAny() {
+ return workUnitStates.stream().findFirst()
+ .orElseThrow(() -> new RuntimeException("no WorkUnitStates -
pre-check `isEmpty()` next time!"));
+ }
+
+ public Collection<WorkUnitState> getAll() {
+ return workUnitStates;
+ }
+
+ public boolean hasAnyCopyableFile() throws IOException {
+ for (WorkUnitState wus : workUnitStates) {
+ if
(CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public List<CommitStep> getPrePublishSteps() throws IOException {
+ return getCommitSteps(PrePublishStep.class);
+ }
+
+ public List<CommitStep> getPostPublishSteps() throws IOException {
+ return getCommitSteps(PostPublishStep.class);
+ }
+
+ public List<WorkUnitState> getPostPublishStates() throws IOException {
+ return getStatesForCommitStepCopyEntities(PostPublishStep.class);
+ }
+
+ public List<WorkUnitState> getNonPostPublishStates() throws IOException {
+ return getStatesForCopyEntitiesNotOf(PostPublishStep.class);
+ }
+
+ private List<CommitStep> getCommitSteps(Class<? extends
CommitStepCopyEntity> baseClass) throws IOException {
+ return getStatesForCommitStepCopyEntities(baseClass).stream()
+ .map(wus -> (CommitStepCopyEntity)
CopySource.deserializeCopyEntity(wus))
+ .sorted(Comparator.comparingInt(CommitStepCopyEntity::getPriority))
+ .map(CommitStepCopyEntity::getStep)
+ .collect(Collectors.toList());
+ }
+
+ private List<WorkUnitState> getStatesForCommitStepCopyEntities(Class<?
extends CommitStepCopyEntity> baseClass)
+ throws IOException {
+ List<WorkUnitState> states = Lists.newArrayList();
+ for (WorkUnitState wus : workUnitStates) {
+ if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+ states.add(wus);
+ }
+ }
+ return states;
+ }
+
+ private List<WorkUnitState> getStatesForCopyEntitiesNotOf(Class<? extends
CommitStepCopyEntity> exclusionBaseClass)
+ throws IOException {
+ List<WorkUnitState> states = Lists.newArrayList();
+ for (WorkUnitState wus : workUnitStates) {
+ if
(!exclusionBaseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+ states.add(wus);
+ }
+ }
+ return states;
+ }
+ }
+
/**
* Publish data for a {@link CopyableDataset}.
*/
@@ -230,9 +305,10 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
Map<String, String> additionalMetadata = Maps.newHashMap();
Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
- "publishFileSet received an empty collection work units. This is an
error in code.");
+ String.format("[%s] publishFileSet got empty work unit states. This is
an error in code.", datasetAndPartition.identifier()));
- WorkUnitState sampledWorkUnitState =
datasetWorkUnitStates.iterator().next();
+ WorkUnitStatesHelper statesHelper = new
WorkUnitStatesHelper(datasetWorkUnitStates);
+ WorkUnitState sampledWorkUnitState = statesHelper.getAny();
CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
@@ -244,23 +320,23 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
Path datasetWriterOutputPath = new Path(writerOutputDir,
datasetAndPartition.identifier());
log.info("Merging all split work units.");
- DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
+ DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll());
- log.info(String.format("[%s] Publishing fileSet from %s for dataset %s",
datasetAndPartition.identifier(),
- datasetWriterOutputPath, metadata.getDatasetURN()));
+ log.info("[{}] Publishing fileSet from {} for dataset {}",
datasetAndPartition.identifier(),
+ datasetWriterOutputPath, metadata.getDatasetURN());
- List<CommitStep> prePublish = getCommitSequence(datasetWorkUnitStates,
PrePublishStep.class);
- List<CommitStep> postPublish = getCommitSequence(datasetWorkUnitStates,
PostPublishStep.class);
- log.info(String.format("[%s] Found %d prePublish steps and %d postPublish
steps.", datasetAndPartition.identifier(),
- prePublish.size(), postPublish.size()));
+ List<CommitStep> prePublishSteps = statesHelper.getPrePublishSteps();
+ List<CommitStep> postPublishSteps = statesHelper.getPostPublishSteps();
+ log.info("[{}] Found {} pre-publish steps and {} post-publish steps.",
datasetAndPartition.identifier(),
+ prePublishSteps.size(), postPublishSteps.size());
- executeCommitSequence(prePublish);
+ executeCommitSequence(prePublishSteps);
- if (hasCopyableFiles(datasetWorkUnitStates)) {
+ if (statesHelper.hasAnyCopyableFile()) {
// Targets are always absolute, so we start moving from root (will skip
any existing directories).
HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new
Path("/"));
} else {
- log.info(String.format("[%s] No copyable files in dataset. Proceeding to
postpublish steps.", datasetAndPartition.identifier()));
+ log.info("[{}] No copyable files in dataset. Proceeding to post-publish
steps.", datasetAndPartition.identifier());
}
this.fs.delete(datasetWriterOutputPath, true);
@@ -269,7 +345,10 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
long datasetUpstreamTimestamp = Long.MAX_VALUE;
Optional<String> fileSetRoot = Optional.absent();
- for (WorkUnitState wus : datasetWorkUnitStates) {
+ // ensure every successful state is committed
+ // WARNING: this MUST NOT run before the WU is actually executed--hence
NOT YET for post-publish steps!
+ // (that's because `WorkUnitState::getWorkingState()` returns
`WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs
yet to execute)
+ for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
@@ -300,9 +379,16 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
}
}
- // execute post publish commit steps after preserving file attributes,
because some post publish step,
- // e.g. SetPermissionCommitStep needs to set permissions
- executeCommitSequence(postPublish);
+ // execute `postPublishSteps` after preserving file attributes, as some,
like `SetPermissionCommitStep`, will themselves set permissions
+ executeCommitSequence(postPublishSteps);
+
+ // since `postPublishSteps` have now executed, finally ready to ensure
every successful WU state of those gets committed
+ for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
+ if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
+ wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ }
+ // NOTE: no need for `CopyableFile`-specific custom handling, as above,
because `PostPublishStep extends CommitStepCopyEntity` and so could not be one
+ }
// if there are no valid values for datasetOriginTimestamp and
datasetUpstreamTimestamp, use
// something more readable
@@ -320,42 +406,6 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
Long.toString(datasetOriginTimestamp),
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
}
-
- private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits)
throws IOException {
- for (WorkUnitState wus : workUnits) {
- if
(CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
- return true;
- }
- }
- return false;
- }
-
- private static List<CommitStep> getCommitSequence(Collection<WorkUnitState>
workUnits, Class<?> baseClass)
- throws IOException {
- List<CommitStepCopyEntity> steps = Lists.newArrayList();
- for (WorkUnitState wus : workUnits) {
- if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
- CommitStepCopyEntity step = (CommitStepCopyEntity)
CopySource.deserializeCopyEntity(wus);
- steps.add(step);
- }
- }
-
- Comparator<CommitStepCopyEntity> commitStepSorter = new
Comparator<CommitStepCopyEntity>() {
- @Override
- public int compare(CommitStepCopyEntity o1, CommitStepCopyEntity o2) {
- return Integer.compare(o1.getPriority(), o2.getPriority());
- }
- };
-
- Collections.sort(steps, commitStepSorter);
- List<CommitStep> sequence = Lists.newArrayList();
- for (CommitStepCopyEntity entity : steps) {
- sequence.add(entity.getStep());
- }
-
- return sequence;
- }
-
private static void executeCommitSequence(List<CommitStep> steps) throws
IOException {
for (CommitStep step : steps) {
step.execute();