This is an automated email from the ASF dual-hosted git repository. oleewere pushed a commit to branch cloudbreak in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
commit d7499927f4dc2349a45261a23b0b955b173250d8 Author: Olivér Szabó <oleew...@gmail.com> AuthorDate: Mon Nov 19 14:19:57 2018 +0100 AMBARI-24833. HDFS client kerberos support + small fixes (#27) * AMBARI-24833. HDFS client kerberos support + small fixes * AMBARI-24833. Fix principal description --- .../logfeeder/common/LogFeederConstants.java | 2 + .../ambari/logfeeder/conf/LogFeederProps.java | 27 ++------ ...HdfsOutputConfig.java => HdfsOutputConfig.java} | 65 +++++++++++++++++-- .../impl/AbstractInputConfigHandler.java | 2 +- .../impl/CloudStorageInputConfigHandler.java | 2 +- .../operations/impl/DefaultInputConfigHandler.java | 2 +- .../logfeeder/output/OutputLineEnricher.java | 2 +- .../cloud/upload/ExternalHDFSUploadClient.java | 73 ---------------------- .../output/cloud/upload/HDFSUploadClient.java | 55 ++++++++++++---- .../output/cloud/upload/UploadClientFactory.java | 4 +- 10 files changed, 114 insertions(+), 120 deletions(-) diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java index f9ef32d..a15ac74 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java @@ -132,6 +132,8 @@ public class LogFeederConstants { public static final String HDFS_PORT = "logfeeder.hdfs.port"; public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions"; public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos"; + public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab"; + public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal"; public static final String S3_ENDPOINT = "logfeeder.s3.endpoint"; public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com"; diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java index f2eb6c7..b6ab4c7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java @@ -19,7 +19,7 @@ package org.apache.ambari.logfeeder.conf; import org.apache.ambari.logfeeder.common.LogFeederConstants; -import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig; +import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig; import org.apache.ambari.logfeeder.conf.output.RolloverConfig; import org.apache.ambari.logfeeder.conf.output.S3OutputConfig; import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties; @@ -53,7 +53,7 @@ public class LogFeederProps implements LogFeederProperties { private S3OutputConfig s3OutputConfig; @Inject - private ExternalHdfsOutputConfig hdfsOutputConfig; + private HdfsOutputConfig hdfsOutputConfig; private Properties properties; @@ -258,7 +258,7 @@ public class LogFeederProps implements LogFeederProperties { defaultValue = "false", sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} ) - @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}") + @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}") private boolean useCloudHdfsClient; @LogSearchPropertyDescription( @@ -281,15 +281,6 @@ public class LogFeederProps implements LogFeederProperties { private String cloudBasePath; @LogSearchPropertyDescription( - name = LogFeederConstants.HDFS_USER, - description = "Overrides HADOOP_USER_NAME variable at runtime", - examples = {"hdfs"}, - sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} - ) - @Value("${"+ LogFeederConstants.HDFS_USER + ":}") - private String logfeederHdfsUser; - - @LogSearchPropertyDescription( name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS, description = "Use filters for inputs (with filters the output format will be JSON)", examples = {"true"}, @@ -460,7 +451,7 @@ public class LogFeederProps implements LogFeederProperties { this.cloudStorageMode = cloudStorageMode; } - public ExternalHdfsOutputConfig getHdfsOutputConfig() { + public HdfsOutputConfig getHdfsOutputConfig() { return hdfsOutputConfig; } @@ -480,7 +471,7 @@ public class LogFeederProps implements LogFeederProperties { this.rolloverConfig = rolloverConfig; } - public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) { + public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) { this.hdfsOutputConfig = hdfsOutputConfig; } @@ -512,14 +503,6 @@ public class LogFeederProps implements LogFeederProperties { return useCloudHdfsClient; } - public String getLogfeederHdfsUser() { - return logfeederHdfsUser; - } - - public void setLogfeederHdfsUser(String logfeederHdfsUser) { - this.logfeederHdfsUser = logfeederHdfsUser; - } - public void setUseCloudHdfsClient(boolean useCloudHdfsClient) { this.useCloudHdfsClient = useCloudHdfsClient; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java similarity index 59% rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java index fbbf869..312f2f0 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java @@ -24,7 +24,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; @Configuration -public class ExternalHdfsOutputConfig { +public class HdfsOutputConfig { @LogSearchPropertyDescription( name = LogFeederConstants.HDFS_HOST, @@ -55,6 +55,15 @@ public class ExternalHdfsOutputConfig { private String hdfsFilePermissions; @LogSearchPropertyDescription( + name = LogFeederConstants.HDFS_USER, + description = "Overrides HADOOP_USER_NAME variable at runtime", + examples = {"hdfs"}, + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.HDFS_USER + ":}") + private String logfeederHdfsUser; + + @LogSearchPropertyDescription( name = LogFeederConstants.HDFS_KERBEROS, description = "Enable kerberos support for HDFS", examples = {"true"}, @@ -62,7 +71,27 @@ public class ExternalHdfsOutputConfig { sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} ) @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}") - private boolean secure; + private boolean hdfsKerberos; + + @LogSearchPropertyDescription( + name = LogFeederConstants.HDFS_KERBEROS_KEYTAB, + description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ", + examples = {"/etc/security/keytabs/mykeytab.keytab"}, + defaultValue = "/etc/security/keytabs/logfeeder.service.keytab", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}") + private String keytab; + + @LogSearchPropertyDescription( + name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL, + description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ", + examples = {"mylogfeeder/myho...@example.com"}, + defaultValue = "logfeeder/_HOST", + sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE} + ) + @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}") + private String principal; public String getHdfsHost() { return hdfsHost; @@ -88,11 +117,35 @@ public class ExternalHdfsOutputConfig { this.hdfsFilePermissions = hdfsFilePermissions; } - public boolean isSecure() { - return secure; + public String getKeytab() { + return keytab; + } + + public void setKeytab(String keytab) { + this.keytab = keytab; + } + + public String getPrincipal() { + return principal; + } + + public void setPrincipal(String principal) { + this.principal = principal; + } + + public String getLogfeederHdfsUser() { + return logfeederHdfsUser; + } + + public void setLogfeederHdfsUser(String logfeederHdfsUser) { + this.logfeederHdfsUser = logfeederHdfsUser; + } + + public boolean isHdfsKerberos() { + return hdfsKerberos; } - public void setSecure(boolean secure) { - this.secure = secure; + public void setHdfsKerberos(boolean hdfsKerberos) { + this.hdfsKerberos = hdfsKerberos; } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java index 31bfd0d..d383ed1 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java @@ -47,7 +47,7 @@ public abstract class AbstractInputConfigHandler implements InputConfigHandler { for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) { for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) { if (filterDescriptor == null) { - logger.warn("Filter descriptor is smpty. Skipping..."); + logger.warn("Filter descriptor is empty. Skipping..."); continue; } if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) { diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java index ac10b2d..c2e73b7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java @@ -52,7 +52,7 @@ public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler { final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters(); for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { if (inputDescriptor == null) { - logger.warn("Input descriptor is smpty. Skipping..."); + logger.warn("Input descriptor is empty. Skipping..."); continue; } LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode(); diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java index dd0fe3e..4677461 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java @@ -74,7 +74,7 @@ public class DefaultInputConfigHandler extends AbstractInputConfigHandler { private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) { for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) { if (inputDescriptor == null) { - logger.warn("Input descriptor is smpty. Skipping..."); + logger.warn("Input descriptor is empty. Skipping..."); continue; } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java index bd9e3df..ff0805d 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java @@ -46,7 +46,7 @@ public class OutputLineEnricher { Input input = inputMarker.getInput(); // Update the block with the context fields for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) { - if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) { + if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) { jsonObj.put(entry.getKey(), entry.getValue()); } } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java deleted file mode 100644 index a23a715..0000000 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.output.cloud.upload; - -import org.apache.ambari.logfeeder.conf.LogFeederProps; -import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig; -import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -/** - * HDFS (on-prem) specific uploader client that can work with an external HDFS. - */ -public class ExternalHDFSUploadClient implements UploadClient { - - private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class); - - private final ExternalHdfsOutputConfig hdfsOutputConfig; - private final FsPermission fsPermission; - private FileSystem fs; - - public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) { - this.hdfsOutputConfig = hdfsOutputConfig; - this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions()); - } - - @Override - public void init(LogFeederProps logFeederProps) { - logger.info("Initialize external HDFS client ..."); - if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) { - logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser()); - System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser()); - } - this.fs = LogFeederHDFSUtil.buildFileSystem( - hdfsOutputConfig.getHdfsHost(), - String.valueOf(hdfsOutputConfig.getHdfsPort())); - if (logFeederProps.getHdfsOutputConfig().isSecure()) { - logger.info("Kerberos is enabled for external HDFS."); - Configuration conf = fs.getConf(); - conf.set("hadoop.security.authentication", "kerberos"); - } - } - - @Override - public void upload(String source, String target) throws Exception { - LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission); - } - - @Override - public void close() { - LogFeederHDFSUtil.closeFileSystem(fs); - } -} diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java index c2a8497..421c4c5 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java @@ -19,15 +19,17 @@ package org.apache.ambari.logfeeder.output.cloud.upload; import org.apache.ambari.logfeeder.conf.LogFeederProps; +import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig; import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; - /** * HDFS client that uses core-site.xml file from the classpath to load the configuration. * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages @@ -35,37 +37,64 @@ import java.io.IOException; public class HDFSUploadClient implements UploadClient { private static final String FS_DEFAULT_FS = "fs.defaultFS"; + private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class); + private final boolean externalHdfs; + private final HdfsOutputConfig hdfsOutputConfig; + private final FsPermission fsPermission; private FileSystem fs; + public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) { + this.hdfsOutputConfig = hdfsOutputConfig; + this.externalHdfs = externalHdfs; + this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions()); + } + @Override public void init(LogFeederProps logFeederProps) { - logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath."); - Configuration configuration = new Configuration(); + final Configuration configuration; + if (externalHdfs) { + configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs"); + logger.info("Using external HDFS client as core-site.xml is not located on the classpath."); + } else { + configuration = new Configuration(); + logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath."); + } if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) { configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs()); } - if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) { - logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser()); - System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser()); + if (hdfsOutputConfig.isHdfsKerberos()) { + logger.info("Kerberos is enabled for HDFS."); + configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName); + UserGroupInformation.setConfiguration(configuration); + try { + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab()); + UserGroupInformation.setLoginUser(ugi); + } catch (Exception e) { + logger.error("Error during kerberos login", e); + throw new RuntimeException(e); + } + } else { + if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) { + logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser()); + System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser()); + } } + logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions()); this.fs = LogFeederHDFSUtil.buildFileSystem(configuration); } @Override public void upload(String source, String target) throws Exception { - LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null); + LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission); } @Override - public void close() throws IOException { + public void close() { LogFeederHDFSUtil.closeFileSystem(fs); } - private boolean isHadoopFileSystem(Configuration conf) { - return conf.get(FS_DEFAULT_FS).contains("hdfs://"); - } - } diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java index bea2943..27d69c7 100644 --- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java +++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java @@ -43,11 +43,11 @@ public class UploadClientFactory { if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) { logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings."); logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS); - return new HDFSUploadClient(); + return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false); } else if (CloudStorageDestination.HDFS.equals(destType)) { logger.info("External HDFS output will be used."); - return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig()); + return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true); } else if (CloudStorageDestination.S3.equals(destType)) { if (useHdfsClient) { logger.info("S3 cloud output will be used with HDFS client.");