abstractdog commented on code in PR #266:
URL: https://github.com/apache/tez/pull/266#discussion_r1113971444
##########
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java:
##########
@@ -1919,6 +1919,13 @@ private DAGRecoveryData recoverDAG() throws IOException,
TezException {
RecoveryParser recoveryParser = new RecoveryParser(
this, recoveryFS, recoveryDataDir, appAttemptID.getAttemptId());
DAGRecoveryData recoveredDAGData =
recoveryParser.parseRecoveryData();
+
+ if(Objects.isNull(recoveredDAGData) && amConf.getBoolean(
+ TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
+
TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA_DEFAULT)) {
+ throw new IOException("Found nothing to recover in
currentAttemptId= "
Review Comment:
a couple of comments here:
1. I think it would make sense to extract the whole parse logic to another
method (from the instantiation of the recoveryParser) to prevent recoverDAG
grow beyond
2. regarding the actual fix: Objects.isNull tells almost nothing about the
scenario that has been discussed in jira, so you should describe it in a code
comment here, touching the most important points (AM shutdown prematurely, so
as the recovery service, recovery stream is not closed, there is no valid
recovery data, recovery parser returns null instead of failing, etc.), also,
describe the assumption here: callers of this method should catch this
exception and make the AM finish with state of DAGAppMasterState.ERROR (this is
currently true, but after a refactor, it can simply collide, as we don't do the
actual AM failure logic here)
3. what about having the recoveryDataDir path in the exception message to
help the log reader investigate further?
##########
tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java:
##########
@@ -332,6 +339,92 @@ public void testParseAllPluginsCustomAndYarnSpecified()
throws IOException {
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
}
+ @Test
+ public void testShutdownTezAMWithMissingRecovery() throws Exception {
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
true);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
2);
+
+ FileSystem spyRecoveryFs = spy(new FileSystem() {
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
boolean b,
Review Comment:
this looks like an IDE generated method, please use the correct variable
names (even if they are not used), e.g. here:
```
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws
IOException {
```
please check other methods too
##########
tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java:
##########
@@ -332,6 +339,92 @@ public void testParseAllPluginsCustomAndYarnSpecified()
throws IOException {
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
}
+ @Test
+ public void testShutdownTezAMWithMissingRecovery() throws Exception {
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
true);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
2);
+
+ FileSystem spyRecoveryFs = spy(new FileSystem() {
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int i) throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, FsPermission fsPermission,
boolean b,
+ int i, short i1, long l, Progressable
progressable)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public FSDataOutputStream append(Path path, int i, Progressable
progressable) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean rename(Path path, Path path1) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean delete(Path path, boolean b) throws IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws FileNotFoundException,
IOException {
+ return new FileStatus[0];
+ }
+
+ @Override
+ public void setWorkingDirectory(Path path) {
+
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return null;
+ }
+
+ @Override
+ public boolean mkdirs(Path path, FsPermission fsPermission) throws
IOException {
+ return false;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return null;
+ }
+ });
+ when(spyRecoveryFs.exists(any())).thenReturn(false);
+
+ DAGAppMasterForTest dam = new DAGAppMasterForTest(attemptId, true);
+ dam.init(conf);
+ dam.start();
+
+ Field field =
DAGAppMasterForTest.class.getSuperclass().getDeclaredField("recoveryFS");
+ field.setAccessible(true);
+ field.set(dam, spyRecoveryFs);
+
+ verify(dam.mockScheduler).setShouldUnregisterFlag();
+ verify(dam.mockShutdown).shutdown();
Review Comment:
we're testing the new feature here, I'm missing something here:
1. it's not clear for the first sight which part of the spy fs caused this
expected behavior? (returning with ERROR)
2. is there a chance to extend this method to reflect what happens in case
of TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA=false?
##########
tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java:
##########
@@ -332,6 +339,92 @@ public void testParseAllPluginsCustomAndYarnSpecified()
throws IOException {
assertEquals(TC_NAME + CLASS_SUFFIX, tcDescriptors.get(1).getClassName());
}
+ @Test
+ public void testShutdownTezAMWithMissingRecovery() throws Exception {
+
+ TezConfiguration conf = new TezConfiguration();
+ conf.setBoolean(TezConfiguration.TEZ_AM_CREDENTIALS_MERGE, true);
+ conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, TEST_DIR.toString());
+ conf.setBoolean(TezConfiguration.TEZ_AM_FAILURE_ON_MISSING_RECOVERY_DATA,
true);
+ conf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, true);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId,
2);
+
+ FileSystem spyRecoveryFs = spy(new FileSystem() {
Review Comment:
is this spy reusable in other testing methods in this class? if so, refactor
it to class field, if it's not, make it obvious why is this special (with
variable name and/or comment on implemented methods)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]