This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3624a4f  [GOBBLIN-1133] Add 
CompactionSuiteBaseWithConfigurableCompleteAction to make complete action 
configurable
3624a4f is described below

commit 3624a4f1e5bc4b8da56e9413699a49601968815c
Author: Zihan Li <[email protected]>
AuthorDate: Fri May 1 23:30:14 2020 -0700

    [GOBBLIN-1133] Add CompactionSuiteBaseWithConfigurableCompleteAction to 
make complete action configurable
    
    Fix Issue that YarnService use the old token to
    acquire new container
    
    pull origin master to change the test
    
    remove unintentional change
    
    address comments
    
    refractor class name
    
    [GOBBLIN-1133]Add
    CompactionSuiteBaseWithConfigurableCompleteAction
    to make complete action configurable
    
    reformate code
    
    address comments
    
    Closes #2973 from ZihanLi58/GOBBLIN-1133
---
 .../CompactionCompleteFileOperationAction.java     | 75 +++++++++++++---------
 .../mapreduce/CompactionJobConfigurator.java       | 18 +++++-
 .../gobblin/compaction/suite/CompactionSuite.java  |  2 +-
 .../compaction/suite/CompactionSuiteBase.java      | 33 ++++------
 ...ionSuiteBaseWithConfigurableCompleteAction.java | 64 ++++++++++++++++++
 ...eBaseWithConfigurableCompleteActionFactory.java | 28 ++++++++
 .../gobblin/yarn/YarnContainerSecurityManager.java |  1 +
 7 files changed, 170 insertions(+), 51 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 02e0578..e4a4cbf 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -20,10 +20,15 @@ package org.apache.gobblin.compaction.action;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -33,7 +38,6 @@ import 
org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.HadoopUtils;
@@ -59,7 +63,7 @@ public class CompactionCompleteFileOperationAction implements 
CompactionComplete
   private EventSubmitter eventSubmitter;
   private FileSystem fs;
 
-  public CompactionCompleteFileOperationAction (State state, 
CompactionJobConfigurator configurator) {
+  public CompactionCompleteFileOperationAction(State state, 
CompactionJobConfigurator configurator) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
     }
@@ -73,7 +77,7 @@ public class CompactionCompleteFileOperationAction implements 
CompactionComplete
    * Replace or append the destination folder with new files from map-reduce 
job
    * Create a record count file containing the number of records that have 
been processed .
    */
-  public void onCompactionJobComplete (FileSystemDataset dataset) throws 
IOException {
+  public void onCompactionJobComplete(FileSystemDataset dataset) throws 
IOException {
     if (dataset.isVirtual()) {
       return;
     }
@@ -81,35 +85,34 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
     if (configurator != null && configurator.isJobCreated()) {
       CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
       Path tmpPath = configurator.getMrOutputPath();
-      Path dstPath = new Path (result.getDstAbsoluteDir());
+      Path dstPath = new Path(result.getDstAbsoluteDir());
 
       // this is append delta mode due to the compaction rename source dir 
mode being enabled
       boolean appendDeltaOutput = 
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
-              MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
+          MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
 
       Job job = this.configurator.getConfiguredJob();
 
       long newTotalRecords = 0;
-      long oldTotalRecords = helper.readRecordCount(new Path 
(result.getDstAbsoluteDir()));
-      long executeCount = helper.readExecutionCount (new Path 
(result.getDstAbsoluteDir()));
+      long oldTotalRecords = helper.readRecordCount(new 
Path(result.getDstAbsoluteDir()));
+      long executeCount = helper.readExecutionCount(new 
Path(result.getDstAbsoluteDir()));
 
       List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job, 
tmpPath, this.fs,
           ImmutableList.of(configurator.getFileExtension()));
-
+      HashSet<Path> outputFiles = new HashSet<>();
       if (appendDeltaOutput) {
-        FsPermission permission = 
HadoopUtils.deserializeFsPermission(this.state,
-                MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
+        FsPermission permission =
+            HadoopUtils.deserializeFsPermission(this.state, 
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                 FsPermission.getDefault());
         WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath, 
permission);
         // append files under mr output to destination
-        for (Path filePath: goodPaths) {
+        for (Path filePath : goodPaths) {
           String fileName = filePath.getName();
           log.info(String.format("Adding %s to %s", filePath.toString(), 
dstPath));
-          Path outPath = new Path (dstPath, fileName);
+          Path outPath = new Path(dstPath, fileName);
 
           if (!this.fs.rename(filePath, outPath)) {
-            throw new IOException(
-                    String.format("Unable to move %s to %s", 
filePath.toString(), outPath.toString()));
+            throw new IOException(String.format("Unable to move %s to %s", 
filePath.toString(), outPath.toString()));
           }
         }
 
@@ -120,15 +123,21 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         // (all previous run + current run) is possible.
         newTotalRecords = this.configurator.getFileNameRecordCount();
       } else {
+        this.configurator.getOldFiles()
+            .addAll(
+                DatasetHelper.getApplicableFilePaths(this.fs, dstPath, 
Arrays.asList(configurator.getFileExtension()))
+                    .stream()
+                    .filter(Objects::nonNull)
+                    .map(Path::toString)
+                    .collect(Collectors.toList()));
         this.fs.delete(dstPath, true);
-        FsPermission permission = 
HadoopUtils.deserializeFsPermission(this.state,
-                MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
+        FsPermission permission =
+            HadoopUtils.deserializeFsPermission(this.state, 
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
                 FsPermission.getDefault());
 
         WriterUtils.mkdirsWithRecursivePermission(this.fs, 
dstPath.getParent(), permission);
         if (!this.fs.rename(tmpPath, dstPath)) {
-          throw new IOException(
-                  String.format("Unable to move %s to %s", tmpPath, dstPath));
+          throw new IOException(String.format("Unable to move %s to %s", 
tmpPath, dstPath));
         }
 
         // Obtain record count from map reduce job counter
@@ -138,34 +147,40 @@ public class CompactionCompleteFileOperationAction 
implements CompactionComplete
         Counter counter = 
job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
         newTotalRecords = counter.getValue();
       }
+      goodPaths.stream().forEach(p -> {
+        String fileName = p.getName();
+        outputFiles.add(new Path(dstPath, fileName));
+      });
+      this.configurator.setDstNewFiles(outputFiles);
 
-      State compactState = helper.loadState(new Path 
(result.getDstAbsoluteDir()));
+      State compactState = helper.loadState(new 
Path(result.getDstAbsoluteDir()));
       compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL, 
Long.toString(newTotalRecords));
       compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL, 
Long.toString(executeCount + 1));
-      compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID, 
this.configurator.getConfiguredJob().getJobID().toString());
-      helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
+      compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
+          this.configurator.getConfiguredJob().getJobID().toString());
+      helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
 
-      log.info("Updating record count from {} to {} in {} [{}]", 
oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
+      log.info("Updating record count from {} to {} in {} [{}]", 
oldTotalRecords, newTotalRecords, dstPath,
+          executeCount + 1);
 
       // submit events for record count
       if (eventSubmitter != null) {
-        Map<String, String> eventMetadataMap = 
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
-            CompactionSlaEventHelper.RECORD_COUNT_TOTAL, 
Long.toString(newTotalRecords),
-            CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, 
Long.toString(oldTotalRecords),
-            CompactionSlaEventHelper.EXEC_COUNT_TOTAL, 
Long.toString(executeCount + 1),
-            CompactionSlaEventHelper.MR_JOB_ID, 
this.configurator.getConfiguredJob().getJobID().toString());
+        Map<String, String> eventMetadataMap =
+            ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, 
dataset.datasetURN(),
+                CompactionSlaEventHelper.RECORD_COUNT_TOTAL, 
Long.toString(newTotalRecords),
+                CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL, 
Long.toString(oldTotalRecords),
+                CompactionSlaEventHelper.EXEC_COUNT_TOTAL, 
Long.toString(executeCount + 1),
+                CompactionSlaEventHelper.MR_JOB_ID, 
this.configurator.getConfiguredJob().getJobID().toString());
         
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT,
 eventMetadataMap);
       }
     }
   }
 
-
-
   public void addEventSubmitter(EventSubmitter eventSubmitter) {
     this.eventSubmitter = eventSubmitter;
   }
 
-  public String getName () {
+  public String getName() {
     return CompactionCompleteFileOperationAction.class.getName();
   }
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index a96a99b..ef3e3a0 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -24,12 +24,15 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.math3.primes.Primes;
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
@@ -89,6 +92,13 @@ public abstract class CompactionJobConfigurator {
   protected boolean isJobCreated = false;
   @Getter
   protected Collection<Path> mapReduceInputPaths = null;
+  //All the old files, which is needed when emit GMCE to register iceberg data
+  @Getter
+  protected Collection<String> oldFiles = null;
+  //All the new files in the final publish dir, which is needed when emit GMCE 
to register iceberg data
+  @Getter
+  @Setter
+  protected Collection<Path> dstNewFiles = null;
   @Getter
   protected long fileNameRecordCount = 0;
 
@@ -116,6 +126,7 @@ public abstract class CompactionJobConfigurator {
   }
 
   public abstract String getFileExtension();
+
   /**
    * Customized MR job creation for Avro.
    *
@@ -246,8 +257,13 @@ public abstract class CompactionJobConfigurator {
       this.mapReduceInputPaths.add(dataset.datasetRoot());
       emptyDirectoryFlag = true;
     }
-
+    this.oldFiles = new HashSet<>();
     for (Path path : mapReduceInputPaths) {
+      oldFiles.addAll(DatasetHelper.getApplicableFilePaths(this.fs, path, 
Arrays.asList(getFileExtension()))
+          .stream()
+          .filter(Objects::nonNull)
+          .map(Path::toString)
+          .collect(Collectors.toList()));
       FileInputFormat.addInputPath(job, path);
     }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index e9e3e50..78c4844 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -79,6 +79,6 @@ public interface CompactionSuite<D extends Dataset> {
   /**
    * Get a list of completion actions after compaction is finished. Actions 
are listed in order
    */
-  List<CompactionCompleteAction<D>> getCompactionCompleteActions();
+  List<CompactionCompleteAction<D>> getCompactionCompleteActions() throws 
IOException;
 
 }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index 6aa0c53..2bb2360 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -17,23 +17,17 @@
 
 package org.apache.gobblin.compaction.suite;
 
+import com.google.gson.Gson;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
-
-import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.gson.Gson;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import 
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
 import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
 import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
 import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
 import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
@@ -42,6 +36,7 @@ import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+import org.apache.hadoop.mapreduce.Job;
 
 
 /**
@@ -95,7 +90,7 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @param dataset A dataset needs serialization
    * @param state   A state that is used to save {@link 
org.apache.gobblin.dataset.Dataset}
    */
-  public void save (FileSystemDataset dataset, State state) {
+  public void save(FileSystemDataset dataset, State state) {
     state.setProp(SERIALIZED_DATASET, GSON.toJson(dataset));
   }
 
@@ -105,22 +100,22 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @param state a type of {@link org.apache.gobblin.runtime.TaskState}
    * @return A new instance of {@link FileSystemDataset}
    */
-  public FileSystemDataset load (final State state) {
+  public FileSystemDataset load(final State state) {
     return GSON.fromJson(state.getProp(SERIALIZED_DATASET), 
FileSystemDataset.class);
   }
 
   /**
    * Some post actions are required after compaction job (map-reduce) is 
finished.
    *
-   * @return  A list of {@link CompactionCompleteAction}s which needs to be 
executed after
+   * @return A list of {@link CompactionCompleteAction}s which needs to be 
executed after
    *          map-reduce is done.
    */
-  public List<CompactionCompleteAction<FileSystemDataset>> 
getCompactionCompleteActions() {
-    ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new 
ArrayList<>();
-    array.add(new CompactionCompleteFileOperationAction(state, 
getConfigurator()));
-    array.add(new CompactionHiveRegistrationAction(state));
-    array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
-    return array;
+  public List<CompactionCompleteAction<FileSystemDataset>> 
getCompactionCompleteActions() throws IOException {
+    ArrayList<CompactionCompleteAction<FileSystemDataset>> 
compactionCompleteActionsList = new ArrayList<>();
+    compactionCompleteActionsList.add(new 
CompactionCompleteFileOperationAction(state, getConfigurator()));
+    compactionCompleteActionsList.add(new 
CompactionHiveRegistrationAction(state));
+    compactionCompleteActionsList.add(new CompactionMarkDirectoryAction(state, 
getConfigurator()));
+    return compactionCompleteActionsList;
   }
 
   /**
@@ -130,13 +125,13 @@ public class CompactionSuiteBase implements 
CompactionSuite<FileSystemDataset> {
    * @param  dataset a top level input path which contains all files those 
need to be compacted
    * @return a map-reduce job which will compact files against {@link 
org.apache.gobblin.dataset.Dataset}
    */
-  public Job createJob (FileSystemDataset dataset) throws IOException {
+  public Job createJob(FileSystemDataset dataset) throws IOException {
     return getConfigurator().createJob(dataset);
   }
 
   protected CompactionJobConfigurator getConfigurator() {
     if (configurator == null) {
-      synchronized(this) {
+      synchronized (this) {
         configurator = 
CompactionJobConfigurator.instantiateConfigurator(this.state);
       }
     }
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
new file mode 100644
index 0000000..c663421
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Compaction suite with configurable complete actions
+ */
+public class CompactionSuiteBaseWithConfigurableCompleteAction extends 
CompactionSuiteBase {
+
+  private final static String COMPACTION_COMPLETE_ACTIONS = 
"compaction.complete.actions";
+
+  /**
+   * Constructor
+   */
+  public CompactionSuiteBaseWithConfigurableCompleteAction(State state) {
+    super(state);
+  }
+
+  /**
+   * Some post actions are required after compaction job (map-reduce) is 
finished.
+   *
+   * @return A list of {@link CompactionCompleteAction}s which needs to be 
executed after
+   *          map-reduce is done.
+   */
+  @Override
+  public List<CompactionCompleteAction<FileSystemDataset>> 
getCompactionCompleteActions() throws IOException {
+    Preconditions.checkArgument(state.contains(COMPACTION_COMPLETE_ACTIONS));
+    ArrayList<CompactionCompleteAction<FileSystemDataset>> 
compactionCompleteActionsList = new ArrayList<>();
+    try {
+      for (String s : state.getPropAsList(COMPACTION_COMPLETE_ACTIONS)) {
+        
compactionCompleteActionsList.add((CompactionCompleteAction<FileSystemDataset>) 
GobblinConstructorUtils.invokeLongestConstructor(
+            Class.forName(s), state, getConfigurator()));
+      }
+    } catch (ReflectiveOperationException e) {
+      throw new IOException(e);
+    }
+    return compactionCompleteActionsList;
+  }
+}
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
new file mode 100644
index 0000000..9b07a52
--- /dev/null
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import org.apache.gobblin.configuration.State;
+
+
+public class CompactionSuiteBaseWithConfigurableCompleteActionFactory extends 
CompactionSuiteBaseFactory {
+  public CompactionSuiteBaseWithConfigurableCompleteAction createSuite(State 
state) {
+    return new CompactionSuiteBaseWithConfigurableCompleteAction(state);
+  }
+}
+
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index f98f7d9..3494daf 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -92,6 +92,7 @@ public class YarnContainerSecurityManager extends 
AbstractIdleService {
   @Override
   protected void shutDown() throws Exception {
     // Nothing to do
+    LOGGER.info("Attempt to shut down YarnContainerSecurityManager");
   }
 
   /**

Reply via email to