This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3624a4f [GOBBLIN-1133] Add
CompactionSuiteBaseWithConfigurableCompleteAction to make complete action
configurable
3624a4f is described below
commit 3624a4f1e5bc4b8da56e9413699a49601968815c
Author: Zihan Li <[email protected]>
AuthorDate: Fri May 1 23:30:14 2020 -0700
[GOBBLIN-1133] Add CompactionSuiteBaseWithConfigurableCompleteAction to
make complete action configurable
Fix Issue that YarnService use the old token to
acquire new container
pull origin master to change the test
remove unintentional change
address comments
refractor class name
[GOBBLIN-1133]Add
CompactionSuiteBaseWithConfigurableCompleteAction
to make complete action configurable
reformate code
address comments
Closes #2973 from ZihanLi58/GOBBLIN-1133
---
.../CompactionCompleteFileOperationAction.java | 75 +++++++++++++---------
.../mapreduce/CompactionJobConfigurator.java | 18 +++++-
.../gobblin/compaction/suite/CompactionSuite.java | 2 +-
.../compaction/suite/CompactionSuiteBase.java | 33 ++++------
...ionSuiteBaseWithConfigurableCompleteAction.java | 64 ++++++++++++++++++
...eBaseWithConfigurableCompleteActionFactory.java | 28 ++++++++
.../gobblin/yarn/YarnContainerSecurityManager.java | 1 +
7 files changed, 170 insertions(+), 51 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index 02e0578..e4a4cbf 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -20,10 +20,15 @@ package org.apache.gobblin.compaction.action;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -33,7 +38,6 @@ import
org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.HadoopUtils;
@@ -59,7 +63,7 @@ public class CompactionCompleteFileOperationAction implements
CompactionComplete
private EventSubmitter eventSubmitter;
private FileSystem fs;
- public CompactionCompleteFileOperationAction (State state,
CompactionJobConfigurator configurator) {
+ public CompactionCompleteFileOperationAction(State state,
CompactionJobConfigurator configurator) {
if (!(state instanceof WorkUnitState)) {
throw new UnsupportedOperationException(this.getClass().getName() + "
only supports workunit state");
}
@@ -73,7 +77,7 @@ public class CompactionCompleteFileOperationAction implements
CompactionComplete
* Replace or append the destination folder with new files from map-reduce
job
* Create a record count file containing the number of records that have
been processed .
*/
- public void onCompactionJobComplete (FileSystemDataset dataset) throws
IOException {
+ public void onCompactionJobComplete(FileSystemDataset dataset) throws
IOException {
if (dataset.isVirtual()) {
return;
}
@@ -81,35 +85,34 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
if (configurator != null && configurator.isJobCreated()) {
CompactionPathParser.CompactionParserResult result = new
CompactionPathParser(state).parse(dataset);
Path tmpPath = configurator.getMrOutputPath();
- Path dstPath = new Path (result.getDstAbsoluteDir());
+ Path dstPath = new Path(result.getDstAbsoluteDir());
// this is append delta mode due to the compaction rename source dir
mode being enabled
boolean appendDeltaOutput =
this.state.getPropAsBoolean(MRCompactor.COMPACTION_RENAME_SOURCE_DIR_ENABLED,
- MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
+ MRCompactor.DEFAULT_COMPACTION_RENAME_SOURCE_DIR_ENABLED);
Job job = this.configurator.getConfiguredJob();
long newTotalRecords = 0;
- long oldTotalRecords = helper.readRecordCount(new Path
(result.getDstAbsoluteDir()));
- long executeCount = helper.readExecutionCount (new Path
(result.getDstAbsoluteDir()));
+ long oldTotalRecords = helper.readRecordCount(new
Path(result.getDstAbsoluteDir()));
+ long executeCount = helper.readExecutionCount(new
Path(result.getDstAbsoluteDir()));
List<Path> goodPaths = CompactionJobConfigurator.getGoodFiles(job,
tmpPath, this.fs,
ImmutableList.of(configurator.getFileExtension()));
-
+ HashSet<Path> outputFiles = new HashSet<>();
if (appendDeltaOutput) {
- FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state,
- MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
+ FsPermission permission =
+ HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs, dstPath,
permission);
// append files under mr output to destination
- for (Path filePath: goodPaths) {
+ for (Path filePath : goodPaths) {
String fileName = filePath.getName();
log.info(String.format("Adding %s to %s", filePath.toString(),
dstPath));
- Path outPath = new Path (dstPath, fileName);
+ Path outPath = new Path(dstPath, fileName);
if (!this.fs.rename(filePath, outPath)) {
- throw new IOException(
- String.format("Unable to move %s to %s",
filePath.toString(), outPath.toString()));
+ throw new IOException(String.format("Unable to move %s to %s",
filePath.toString(), outPath.toString()));
}
}
@@ -120,15 +123,21 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
// (all previous run + current run) is possible.
newTotalRecords = this.configurator.getFileNameRecordCount();
} else {
+ this.configurator.getOldFiles()
+ .addAll(
+ DatasetHelper.getApplicableFilePaths(this.fs, dstPath,
Arrays.asList(configurator.getFileExtension()))
+ .stream()
+ .filter(Objects::nonNull)
+ .map(Path::toString)
+ .collect(Collectors.toList()));
this.fs.delete(dstPath, true);
- FsPermission permission =
HadoopUtils.deserializeFsPermission(this.state,
- MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
+ FsPermission permission =
+ HadoopUtils.deserializeFsPermission(this.state,
MRCompactorJobRunner.COMPACTION_JOB_OUTPUT_DIR_PERMISSION,
FsPermission.getDefault());
WriterUtils.mkdirsWithRecursivePermission(this.fs,
dstPath.getParent(), permission);
if (!this.fs.rename(tmpPath, dstPath)) {
- throw new IOException(
- String.format("Unable to move %s to %s", tmpPath, dstPath));
+ throw new IOException(String.format("Unable to move %s to %s",
tmpPath, dstPath));
}
// Obtain record count from map reduce job counter
@@ -138,34 +147,40 @@ public class CompactionCompleteFileOperationAction
implements CompactionComplete
Counter counter =
job.getCounters().findCounter(RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
newTotalRecords = counter.getValue();
}
+ goodPaths.stream().forEach(p -> {
+ String fileName = p.getName();
+ outputFiles.add(new Path(dstPath, fileName));
+ });
+ this.configurator.setDstNewFiles(outputFiles);
- State compactState = helper.loadState(new Path
(result.getDstAbsoluteDir()));
+ State compactState = helper.loadState(new
Path(result.getDstAbsoluteDir()));
compactState.setProp(CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords));
compactState.setProp(CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executeCount + 1));
- compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
- helper.saveState(new Path (result.getDstAbsoluteDir()), compactState);
+ compactState.setProp(CompactionSlaEventHelper.MR_JOB_ID,
+ this.configurator.getConfiguredJob().getJobID().toString());
+ helper.saveState(new Path(result.getDstAbsoluteDir()), compactState);
- log.info("Updating record count from {} to {} in {} [{}]",
oldTotalRecords, newTotalRecords, dstPath, executeCount + 1);
+ log.info("Updating record count from {} to {} in {} [{}]",
oldTotalRecords, newTotalRecords, dstPath,
+ executeCount + 1);
// submit events for record count
if (eventSubmitter != null) {
- Map<String, String> eventMetadataMap =
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
- CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords),
- CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL,
Long.toString(oldTotalRecords),
- CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executeCount + 1),
- CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
+ Map<String, String> eventMetadataMap =
+ ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN,
dataset.datasetURN(),
+ CompactionSlaEventHelper.RECORD_COUNT_TOTAL,
Long.toString(newTotalRecords),
+ CompactionSlaEventHelper.PREV_RECORD_COUNT_TOTAL,
Long.toString(oldTotalRecords),
+ CompactionSlaEventHelper.EXEC_COUNT_TOTAL,
Long.toString(executeCount + 1),
+ CompactionSlaEventHelper.MR_JOB_ID,
this.configurator.getConfiguredJob().getJobID().toString());
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_RECORD_COUNT_EVENT,
eventMetadataMap);
}
}
}
-
-
public void addEventSubmitter(EventSubmitter eventSubmitter) {
this.eventSubmitter = eventSubmitter;
}
- public String getName () {
+ public String getName() {
return CompactionCompleteFileOperationAction.class.getName();
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
index a96a99b..ef3e3a0 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
@@ -24,12 +24,15 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
@@ -89,6 +92,13 @@ public abstract class CompactionJobConfigurator {
protected boolean isJobCreated = false;
@Getter
protected Collection<Path> mapReduceInputPaths = null;
+ //All the old files, which is needed when emit GMCE to register iceberg data
+ @Getter
+ protected Collection<String> oldFiles = null;
+ //All the new files in the final publish dir, which is needed when emit GMCE
to register iceberg data
+ @Getter
+ @Setter
+ protected Collection<Path> dstNewFiles = null;
@Getter
protected long fileNameRecordCount = 0;
@@ -116,6 +126,7 @@ public abstract class CompactionJobConfigurator {
}
public abstract String getFileExtension();
+
/**
* Customized MR job creation for Avro.
*
@@ -246,8 +257,13 @@ public abstract class CompactionJobConfigurator {
this.mapReduceInputPaths.add(dataset.datasetRoot());
emptyDirectoryFlag = true;
}
-
+ this.oldFiles = new HashSet<>();
for (Path path : mapReduceInputPaths) {
+ oldFiles.addAll(DatasetHelper.getApplicableFilePaths(this.fs, path,
Arrays.asList(getFileExtension()))
+ .stream()
+ .filter(Objects::nonNull)
+ .map(Path::toString)
+ .collect(Collectors.toList()));
FileInputFormat.addInputPath(job, path);
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index e9e3e50..78c4844 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -79,6 +79,6 @@ public interface CompactionSuite<D extends Dataset> {
/**
* Get a list of completion actions after compaction is finished. Actions
are listed in order
*/
- List<CompactionCompleteAction<D>> getCompactionCompleteActions();
+ List<CompactionCompleteAction<D>> getCompactionCompleteActions() throws
IOException;
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
index 6aa0c53..2bb2360 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBase.java
@@ -17,23 +17,17 @@
package org.apache.gobblin.compaction.suite;
+import com.google.gson.Gson;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-
-import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
-
-import org.apache.hadoop.mapreduce.Job;
-
-import com.google.gson.Gson;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.compaction.action.CompactionCompleteAction;
import
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
+import org.apache.gobblin.compaction.mapreduce.CompactionJobConfigurator;
import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
@@ -42,6 +36,7 @@ import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+import org.apache.hadoop.mapreduce.Job;
/**
@@ -95,7 +90,7 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @param dataset A dataset needs serialization
* @param state A state that is used to save {@link
org.apache.gobblin.dataset.Dataset}
*/
- public void save (FileSystemDataset dataset, State state) {
+ public void save(FileSystemDataset dataset, State state) {
state.setProp(SERIALIZED_DATASET, GSON.toJson(dataset));
}
@@ -105,22 +100,22 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @param state a type of {@link org.apache.gobblin.runtime.TaskState}
* @return A new instance of {@link FileSystemDataset}
*/
- public FileSystemDataset load (final State state) {
+ public FileSystemDataset load(final State state) {
return GSON.fromJson(state.getProp(SERIALIZED_DATASET),
FileSystemDataset.class);
}
/**
* Some post actions are required after compaction job (map-reduce) is
finished.
*
- * @return A list of {@link CompactionCompleteAction}s which needs to be
executed after
+ * @return A list of {@link CompactionCompleteAction}s which needs to be
executed after
* map-reduce is done.
*/
- public List<CompactionCompleteAction<FileSystemDataset>>
getCompactionCompleteActions() {
- ArrayList<CompactionCompleteAction<FileSystemDataset>> array = new
ArrayList<>();
- array.add(new CompactionCompleteFileOperationAction(state,
getConfigurator()));
- array.add(new CompactionHiveRegistrationAction(state));
- array.add(new CompactionMarkDirectoryAction(state, getConfigurator()));
- return array;
+ public List<CompactionCompleteAction<FileSystemDataset>>
getCompactionCompleteActions() throws IOException {
+ ArrayList<CompactionCompleteAction<FileSystemDataset>>
compactionCompleteActionsList = new ArrayList<>();
+ compactionCompleteActionsList.add(new
CompactionCompleteFileOperationAction(state, getConfigurator()));
+ compactionCompleteActionsList.add(new
CompactionHiveRegistrationAction(state));
+ compactionCompleteActionsList.add(new CompactionMarkDirectoryAction(state,
getConfigurator()));
+ return compactionCompleteActionsList;
}
/**
@@ -130,13 +125,13 @@ public class CompactionSuiteBase implements
CompactionSuite<FileSystemDataset> {
* @param dataset a top level input path which contains all files those
need to be compacted
* @return a map-reduce job which will compact files against {@link
org.apache.gobblin.dataset.Dataset}
*/
- public Job createJob (FileSystemDataset dataset) throws IOException {
+ public Job createJob(FileSystemDataset dataset) throws IOException {
return getConfigurator().createJob(dataset);
}
protected CompactionJobConfigurator getConfigurator() {
if (configurator == null) {
- synchronized(this) {
+ synchronized (this) {
configurator =
CompactionJobConfigurator.instantiateConfigurator(this.state);
}
}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
new file mode 100644
index 0000000..c663421
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteAction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.gobblin.compaction.action.CompactionCompleteAction;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Compaction suite with configurable complete actions
+ */
+public class CompactionSuiteBaseWithConfigurableCompleteAction extends
CompactionSuiteBase {
+
+ private final static String COMPACTION_COMPLETE_ACTIONS =
"compaction.complete.actions";
+
+ /**
+ * Constructor
+ */
+ public CompactionSuiteBaseWithConfigurableCompleteAction(State state) {
+ super(state);
+ }
+
+ /**
+ * Some post actions are required after compaction job (map-reduce) is
finished.
+ *
+ * @return A list of {@link CompactionCompleteAction}s which needs to be
executed after
+ * map-reduce is done.
+ */
+ @Override
+ public List<CompactionCompleteAction<FileSystemDataset>>
getCompactionCompleteActions() throws IOException {
+ Preconditions.checkArgument(state.contains(COMPACTION_COMPLETE_ACTIONS));
+ ArrayList<CompactionCompleteAction<FileSystemDataset>>
compactionCompleteActionsList = new ArrayList<>();
+ try {
+ for (String s : state.getPropAsList(COMPACTION_COMPLETE_ACTIONS)) {
+
compactionCompleteActionsList.add((CompactionCompleteAction<FileSystemDataset>)
GobblinConstructorUtils.invokeLongestConstructor(
+ Class.forName(s), state, getConfigurator()));
+ }
+ } catch (ReflectiveOperationException e) {
+ throw new IOException(e);
+ }
+ return compactionCompleteActionsList;
+ }
+}
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
new file mode 100644
index 0000000..9b07a52
--- /dev/null
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuiteBaseWithConfigurableCompleteActionFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.suite;
+
+import org.apache.gobblin.configuration.State;
+
+
+public class CompactionSuiteBaseWithConfigurableCompleteActionFactory extends
CompactionSuiteBaseFactory {
+ public CompactionSuiteBaseWithConfigurableCompleteAction createSuite(State
state) {
+ return new CompactionSuiteBaseWithConfigurableCompleteAction(state);
+ }
+}
+
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
index f98f7d9..3494daf 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerSecurityManager.java
@@ -92,6 +92,7 @@ public class YarnContainerSecurityManager extends
AbstractIdleService {
@Override
protected void shutDown() throws Exception {
// Nothing to do
+ LOGGER.info("Attempt to shut down YarnContainerSecurityManager");
}
/**