This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 00c60ca64 [GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing 
post-publish WUs before they've actually run (#3844)
00c60ca64 is described below

commit 00c60ca6492644652c66905eb384ab145e348bbf
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Dec 7 11:40:27 2023 -0800

    [GOBBLIN-1972] Fix `CopyDataPublisher` to avoid committing post-publish WUs 
before they've actually run (#3844)
    
    * Fix `CopyDataPublisher` to avoid committing post-publish WUs before 
they've actually run
    
    * fixup findbugsMain
---
 .../copy/publisher/CopyDataPublisher.java          | 164 ++++++++++++++-------
 1 file changed, 107 insertions(+), 57 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 2c282bf36..56eb78e4f 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -20,17 +20,14 @@ package org.apache.gobblin.data.management.copy.publisher;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.PreserveAttributes;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,6 +53,8 @@ import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
 import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
@@ -71,8 +70,10 @@ import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 
 
 /**
@@ -222,6 +223,80 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     }
   }
 
+  /** Organizes and encapsulates access to {@link WorkUnitState}s according to 
useful access patterns. */
+  @AllArgsConstructor
+  private static class WorkUnitStatesHelper {
+    private final Collection<WorkUnitState> workUnitStates;
+
+    public boolean isEmpty() {
+      return workUnitStates.isEmpty();
+    }
+
+    public WorkUnitState getAny() {
+      return workUnitStates.stream().findFirst()
+          .orElseThrow(() -> new RuntimeException("no WorkUnitStates - 
pre-check `isEmpty()` next time!"));
+    }
+
+    public Collection<WorkUnitState> getAll() {
+      return workUnitStates;
+    }
+
+    public boolean hasAnyCopyableFile() throws IOException {
+      for (WorkUnitState wus : workUnitStates) {
+        if 
(CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public List<CommitStep> getPrePublishSteps() throws IOException {
+      return getCommitSteps(PrePublishStep.class);
+    }
+
+    public List<CommitStep> getPostPublishSteps() throws IOException {
+      return getCommitSteps(PostPublishStep.class);
+    }
+
+    public List<WorkUnitState> getPostPublishStates() throws IOException {
+      return getStatesForCommitStepCopyEntities(PostPublishStep.class);
+    }
+
+    public List<WorkUnitState> getNonPostPublishStates() throws IOException {
+      return getStatesForCopyEntitiesNotOf(PostPublishStep.class);
+    }
+
+    private List<CommitStep> getCommitSteps(Class<? extends 
CommitStepCopyEntity> baseClass) throws IOException {
+      return getStatesForCommitStepCopyEntities(baseClass).stream()
+          .map(wus -> (CommitStepCopyEntity) 
CopySource.deserializeCopyEntity(wus))
+          .sorted(Comparator.comparingInt(CommitStepCopyEntity::getPriority))
+          .map(CommitStepCopyEntity::getStep)
+          .collect(Collectors.toList());
+    }
+
+    private List<WorkUnitState> getStatesForCommitStepCopyEntities(Class<? 
extends CommitStepCopyEntity> baseClass)
+        throws IOException {
+      List<WorkUnitState> states = Lists.newArrayList();
+      for (WorkUnitState wus : workUnitStates) {
+        if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+          states.add(wus);
+        }
+      }
+      return states;
+    }
+
+    private List<WorkUnitState> getStatesForCopyEntitiesNotOf(Class<? extends 
CommitStepCopyEntity> exclusionBaseClass)
+        throws IOException {
+      List<WorkUnitState> states = Lists.newArrayList();
+      for (WorkUnitState wus : workUnitStates) {
+        if 
(!exclusionBaseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
+          states.add(wus);
+        }
+      }
+      return states;
+    }
+  }
+
   /**
    * Publish data for a {@link CopyableDataset}.
    */
@@ -230,9 +305,10 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     Map<String, String> additionalMetadata = Maps.newHashMap();
 
     Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
-        "publishFileSet received an empty collection work units. This is an 
error in code.");
+        String.format("[%s] publishFileSet got empty work unit states. This is 
an error in code.", datasetAndPartition.identifier()));
 
-    WorkUnitState sampledWorkUnitState =  
datasetWorkUnitStates.iterator().next();
+    WorkUnitStatesHelper statesHelper = new 
WorkUnitStatesHelper(datasetWorkUnitStates);
+    WorkUnitState sampledWorkUnitState =  statesHelper.getAny();
 
     CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
         sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
@@ -244,23 +320,23 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     Path datasetWriterOutputPath = new Path(writerOutputDir, 
datasetAndPartition.identifier());
 
     log.info("Merging all split work units.");
-    DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
+    DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, statesHelper.getAll());
 
-    log.info(String.format("[%s] Publishing fileSet from %s for dataset %s", 
datasetAndPartition.identifier(),
-        datasetWriterOutputPath, metadata.getDatasetURN()));
+    log.info("[{}] Publishing fileSet from {} for dataset {}", 
datasetAndPartition.identifier(),
+        datasetWriterOutputPath, metadata.getDatasetURN());
 
-    List<CommitStep> prePublish = getCommitSequence(datasetWorkUnitStates, 
PrePublishStep.class);
-    List<CommitStep> postPublish = getCommitSequence(datasetWorkUnitStates, 
PostPublishStep.class);
-    log.info(String.format("[%s] Found %d prePublish steps and %d postPublish 
steps.", datasetAndPartition.identifier(),
-        prePublish.size(), postPublish.size()));
+    List<CommitStep> prePublishSteps = statesHelper.getPrePublishSteps();
+    List<CommitStep> postPublishSteps = statesHelper.getPostPublishSteps();
+    log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", 
datasetAndPartition.identifier(),
+        prePublishSteps.size(), postPublishSteps.size());
 
-    executeCommitSequence(prePublish);
+    executeCommitSequence(prePublishSteps);
 
-    if (hasCopyableFiles(datasetWorkUnitStates)) {
+    if (statesHelper.hasAnyCopyableFile()) {
       // Targets are always absolute, so we start moving from root (will skip 
any existing directories).
       HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new 
Path("/"));
     } else {
-      log.info(String.format("[%s] No copyable files in dataset. Proceeding to 
postpublish steps.", datasetAndPartition.identifier()));
+      log.info("[{}] No copyable files in dataset. Proceeding to post-publish 
steps.", datasetAndPartition.identifier());
     }
 
     this.fs.delete(datasetWriterOutputPath, true);
@@ -269,7 +345,10 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     long datasetUpstreamTimestamp = Long.MAX_VALUE;
     Optional<String> fileSetRoot = Optional.absent();
 
-    for (WorkUnitState wus : datasetWorkUnitStates) {
+    // ensure every successful state is committed
+    // WARNING: this MUST NOT run before the WU is actually executed--hence 
NOT YET for post-publish steps!
+    // (that's because `WorkUnitState::getWorkingState()` returns 
`WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs 
yet to execute)
+    for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
       if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
         wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
       }
@@ -300,9 +379,16 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
       }
     }
 
-    // execute post publish commit steps after preserving file attributes, 
because some post publish step,
-    // e.g. SetPermissionCommitStep needs to set permissions
-    executeCommitSequence(postPublish);
+    // execute `postPublishSteps` after preserving file attributes, as some, 
like `SetPermissionCommitStep`, will themselves set permissions
+    executeCommitSequence(postPublishSteps);
+
+    // since `postPublishSteps` have now executed, finally ready to ensure 
every successful WU state of those gets committed
+    for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
+      if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
+        wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      }
+      // NOTE: no need for `CopyableFile`-specific custom handling, as above, 
because `PostPublishStep extends CommitStepCopyEntity` and so could not be one
+    }
 
     // if there are no valid values for datasetOriginTimestamp and 
datasetUpstreamTimestamp, use
     // something more readable
@@ -320,42 +406,6 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
         Long.toString(datasetOriginTimestamp), 
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
   }
 
-
-  private static boolean hasCopyableFiles(Collection<WorkUnitState> workUnits) 
throws IOException {
-    for (WorkUnitState wus : workUnits) {
-      if 
(CopyableFile.class.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private static List<CommitStep> getCommitSequence(Collection<WorkUnitState> 
workUnits, Class<?> baseClass)
-      throws IOException {
-    List<CommitStepCopyEntity> steps = Lists.newArrayList();
-    for (WorkUnitState wus : workUnits) {
-      if (baseClass.isAssignableFrom(CopySource.getCopyEntityClass(wus))) {
-        CommitStepCopyEntity step = (CommitStepCopyEntity) 
CopySource.deserializeCopyEntity(wus);
-        steps.add(step);
-      }
-    }
-
-    Comparator<CommitStepCopyEntity> commitStepSorter = new 
Comparator<CommitStepCopyEntity>() {
-      @Override
-      public int compare(CommitStepCopyEntity o1, CommitStepCopyEntity o2) {
-        return Integer.compare(o1.getPriority(), o2.getPriority());
-      }
-    };
-
-    Collections.sort(steps, commitStepSorter);
-    List<CommitStep> sequence = Lists.newArrayList();
-    for (CommitStepCopyEntity entity : steps) {
-      sequence.add(entity.getStep());
-    }
-
-    return sequence;
-  }
-
   private static void executeCommitSequence(List<CommitStep> steps) throws 
IOException {
     for (CommitStep step : steps) {
       step.execute();

Reply via email to