szilard-nemeth commented on code in PR #4425:
URL: https://github.com/apache/hadoop/pull/4425#discussion_r895036356


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileControllerFactory.java:
##########
@@ -66,76 +65,64 @@ public LogAggregationFileControllerFactory(Configuration 
conf) {
     Collection<String> fileControllers = conf.getStringCollection(
         YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
     List<String> controllerClassName = new ArrayList<>();
-
     Map<String, String> controllerChecker = new HashMap<>();
 
-    for (String fileController : fileControllers) {
+    for (String controllerName : fileControllers) {
       Preconditions.checkArgument(validateAggregatedFileControllerName(
-          fileController), "The FileControllerName: " + fileController
-          + " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
-          +" is invalid." + "The valid File Controller name should only "
-          + "contain a-zA-Z0-9_ and can not start with numbers");
+          controllerName), String.format("The FileControllerName: %s set in " +
+          "%s is invalid.The valid File Controller name should only contain " +
+          "a-zA-Z0-9_ and cannot start with numbers", controllerName,
+          YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS));
 
-      String remoteDirStr = String.format(
-          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
-          fileController);
-      String remoteDir = conf.get(remoteDirStr);
-      boolean defaultRemoteDir = false;
-      if (remoteDir == null || remoteDir.isEmpty()) {
-        remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
-        defaultRemoteDir = true;
-      }
-      String suffixStr = String.format(
-          YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
-          fileController);
-      String suffix = conf.get(suffixStr);
-      boolean defaultSuffix = false;
-      if (suffix == null || suffix.isEmpty()) {
-        suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-        defaultSuffix = true;
-      }
-      String dirSuffix = remoteDir + "-" + suffix;
-      if (controllerChecker.containsKey(dirSuffix)) {
-        if (defaultRemoteDir && defaultSuffix) {
-          String fileControllerStr = controllerChecker.get(dirSuffix);
-          List<String> controllersList = new ArrayList<>();
-          controllersList.add(fileControllerStr);
-          controllersList.add(fileController);
-          fileControllerStr = StringUtils.join(controllersList, ",");
-          controllerChecker.put(dirSuffix, fileControllerStr);
-        } else {
-          String conflictController = controllerChecker.get(dirSuffix);
-          throw new RuntimeException("The combined value of " + remoteDirStr
-              + " and " + suffixStr + " should not be the same as the value"
-              + " set for " + conflictController);
-        }
+      validateDuplicateRemoteAppLogDirs(conf, controllerChecker, 
controllerName);
+      DeterminedControllerClassName className =
+          new DeterminedControllerClassName(conf, controllerName);
+      controllerClassName.add(className.value);
+      LogAggregationFileController controller = 
createFileControllerInstance(conf,
+          controllerClassName, controllerName, className);
+      controller.initialize(conf, controllerName);
+      controllers.add(controller);
+    }
+  }
+
+  private LogAggregationFileController createFileControllerInstance(
+      Configuration conf, List<String> controllerClassName,
+      String fileController, DeterminedControllerClassName className) {
+    Class<? extends LogAggregationFileController> clazz = conf.getClass(
+        className.configKey, null, LogAggregationFileController.class);
+    if (clazz == null) {
+      throw new RuntimeException("No class defined for " + fileController);
+    }
+    LogAggregationFileController instance = ReflectionUtils.newInstance(clazz, 
conf);
+    if (instance == null) {
+      throw new RuntimeException("No object created for " + 
controllerClassName);
+    }
+    return instance;
+  }
+
+  private void validateDuplicateRemoteAppLogDirs(Configuration conf,

Review Comment:
   Sure, renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to