[
https://issues.apache.org/jira/browse/AMBARI-24878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687469#comment-16687469
]
ASF GitHub Bot commented on AMBARI-24878:
-----------------------------------------
kasakrisz closed pull request #14: AMBARI-24878 - Infra Manager: kerberos
support
URL: https://github.com/apache/ambari-infra/pull/14
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index 0118c769..ddc4f000 100644
---
a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++
b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -18,10 +18,16 @@
*/
package org.apache.ambari.infra;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -36,15 +42,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
// TODO: use swagger
public class InfraClient implements AutoCloseable {
@@ -96,6 +97,12 @@ public JobExecutionInfo startJob(String jobName, String
parameters) {
try {
String responseText = execute(new
HttpPost(uriBuilder.build())).getBody();
Map<String, Object> responseContent = new
ObjectMapper().readValue(responseText, new
TypeReference<HashMap<String,Object>>() {});
+ if (!responseContent.containsKey("jobId"))
+ throw new NullPointerException("jobId is not found in start job
responseContent");
+ if (!responseContent.containsKey("jobExecutionData"))
+ throw new NullPointerException("jobExecutionData is not found in start
job responseContent");
+ if (!((Map)responseContent.get("jobExecutionData")).containsKey("id"))
+ throw new NullPointerException("id is not found in jobExecutionData");
return new JobExecutionInfo(responseContent.get("jobId").toString(),
((Map)responseContent.get("jobExecutionData")).get("id").toString());
} catch (URISyntaxException | JsonParseException | JsonMappingException e)
{
throw new RuntimeException(e);
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
index 6a36f724..5c783d6e 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
@@ -54,6 +54,8 @@
@JsonSerialize(converter = FsPermissionToStringConverter.class)
@JsonDeserialize(converter = StringToFsPermissionConverter.class)
private FsPermission hdfsFilePermission;
+ private String hdfsKerberosPrincipal;
+ private String hdfsKerberosKeytabPath;
private String start;
private String end;
@JsonSerialize(converter = DurationToStringConverter.class)
@@ -172,6 +174,22 @@ public void setHdfsFilePermission(FsPermission
hdfsFilePermission) {
this.hdfsFilePermission = hdfsFilePermission;
}
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
public Optional<S3Properties> s3Properties() {
if (isBlank(s3BucketName))
return Optional.empty();
@@ -183,6 +201,18 @@ public void setHdfsFilePermission(FsPermission
hdfsFilePermission) {
s3Endpoint));
}
+ public Optional<HdfsProperties> hdfsProperties() {
+ if (isBlank(hdfsDestinationDirectory))
+ return Optional.empty();
+
+ return Optional.of(new HdfsProperties(
+ hdfsEndpoint,
+ hdfsDestinationDirectory,
+ hdfsFilePermission,
+ hdfsKerberosPrincipal,
+ hdfsKerberosKeytabPath));
+ }
+
public String getStart() {
return start;
}
@@ -234,12 +264,9 @@ public void validate() {
break;
case HDFS:
- if (isBlank(hdfsEndpoint))
- throw new IllegalArgumentException(String.format(
- "The property hdfsEndpoint can not be null or empty string
when destination is set to %s!", HDFS.name()));
- if (isBlank(hdfsDestinationDirectory))
- throw new IllegalArgumentException(String.format(
- "The property hdfsDestinationDirectory can not be null or
empty string when destination is set to %s!", HDFS.name()));
+ hdfsProperties()
+ .orElseThrow(() -> new IllegalArgumentException("HDFS related
properties must be set if the destination is " + HDFS.name()))
+ .validate();
}
requireNonNull(solr, "No solr query was specified for archiving job!");
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 85fb364d..af522d3b 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -32,7 +32,6 @@
import org.apache.ambari.infra.job.JobContextRepository;
import org.apache.ambari.infra.job.JobScheduler;
import org.apache.ambari.infra.job.ObjectSource;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
@@ -103,8 +102,8 @@ public DocumentExporter documentExporter(DocumentItemReader
documentItemReader,
break;
case HDFS:
org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
- conf.set("fs.defaultFS", parameters.getHdfsEndpoint());
- fileAction.add(new HdfsUploader(conf, new
Path(parameters.getHdfsDestinationDirectory()),
parameters.getHdfsFilePermission()));
+ fileAction.add(new HdfsUploader(conf,
+ parameters.hdfsProperties().orElseThrow(() -> new
IllegalStateException("HDFS properties are not provided!"))));
break;
case LOCAL:
baseDir = new File(parameters.getLocalDestinationDirectory());
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index a5735623..8ad576c4 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -23,7 +23,6 @@
import static org.apache.commons.lang.StringUtils.isBlank;
import java.time.Duration;
-import java.util.Optional;
import org.apache.ambari.infra.job.JobProperties;
import org.apache.ambari.infra.json.DurationToStringConverter;
@@ -40,6 +39,7 @@
private String fileNameSuffixDateFormat;
private Duration ttl;
private SolrProperties solr;
+
private String s3AccessFile;
private String s3KeyPrefix;
private String s3BucketName;
@@ -48,6 +48,8 @@
private String hdfsEndpoint;
private String hdfsDestinationDirectory;
private FsPermission hdfsFilePermission;
+ private String hdfsKerberosPrincipal;
+ private String hdfsKerberosKeytabPath;
public int getReadBlockSize() {
return readBlockSize;
@@ -145,17 +147,6 @@ public void setS3Endpoint(String s3Endpoint) {
this.s3Endpoint = s3Endpoint;
}
- public Optional<S3Properties> s3Properties() {
- if (isBlank(s3BucketName))
- return Optional.empty();
-
- return Optional.of(new S3Properties(
- s3AccessFile,
- s3KeyPrefix,
- s3BucketName,
- s3Endpoint));
- }
-
public String getHdfsEndpoint() {
return hdfsEndpoint;
}
@@ -180,6 +171,22 @@ public void setHdfsDestinationDirectory(String
hdfsDestinationDirectory) {
this.hdfsDestinationDirectory = hdfsDestinationDirectory;
}
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
private int getIntJobParameter(JobParameters jobParameters, String
parameterName, int defaultValue) {
String valueText = jobParameters.getString(parameterName);
if (isBlank(valueText))
@@ -203,6 +210,8 @@ public ArchivingParameters merge(JobParameters
jobParameters) {
archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint",
hdfsEndpoint));
archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory",
hdfsDestinationDirectory));
archivingParameters.setHdfsFilePermission(toFsPermission(jobParameters.getString("hdfsFilePermission",
FsPermissionToStringConverter.toString(hdfsFilePermission))));
+
archivingParameters.setHdfsKerberosPrincipal(jobParameters.getString("hdfsKerberosPrincipal",
hdfsKerberosPrincipal));
+
archivingParameters.setHdfsKerberosKeytabPath(jobParameters.getString("hdfsKerberosKeytabPath",
hdfsKerberosKeytabPath));
archivingParameters.setSolr(solr.merge(jobParameters));
archivingParameters.setStart(jobParameters.getString("start"));
archivingParameters.setEnd(jobParameters.getString("end"));
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
new file mode 100644
index 00000000..da4137fe
--- /dev/null
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class HdfsProperties {
+ private static final String DEFAULT_FILE_PERMISSION = "640";
+
+ private final String hdfsEndpoint;
+ private final String hdfsDestinationDirectory;
+ private final FsPermission hdfsFilePermission;
+ private final String hdfsKerberosPrincipal;
+ private final String hdfsKerberosKeytabPath;
+
+ public HdfsProperties(String hdfsEndpoint, String hdfsDestinationDirectory,
FsPermission hdfsFilePermission, String hdfsKerberosPrincipal, String
hdfsKerberosKeytabPath) {
+ this.hdfsEndpoint = hdfsEndpoint;
+ this.hdfsDestinationDirectory = hdfsDestinationDirectory;
+ this.hdfsFilePermission = hdfsFilePermission == null ? new
FsPermission(DEFAULT_FILE_PERMISSION) : hdfsFilePermission;
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
+ public String getHdfsEndpoint() {
+ return hdfsEndpoint;
+ }
+
+ public String getHdfsDestinationDirectory() {
+ return hdfsDestinationDirectory;
+ }
+
+ public FsPermission getHdfsFilePermission() {
+ return hdfsFilePermission;
+ }
+
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ @Override
+ public String toString() {
+ return "HdfsProperties{" +
+ "hdfsEndpoint='" + hdfsEndpoint + '\'' +
+ ", hdfsDestinationDirectory='" + hdfsDestinationDirectory + '\'' +
+ ", hdfsFilePermission=" + hdfsFilePermission +
+ ", hdfsKerberosPrincipal='" + hdfsKerberosPrincipal + '\'' +
+ ", hdfsKerberosKeytabPath='" + hdfsKerberosKeytabPath + '\'' +
+ '}';
+ }
+
+ public void validate() {
+ if (isBlank(hdfsDestinationDirectory))
+ throw new IllegalArgumentException("The property
hdfsDestinationDirectory can not be null or empty string!");
+
+ if (isNotBlank(hdfsKerberosPrincipal) && isBlank(hdfsKerberosKeytabPath))
+ throw new IllegalArgumentException("The property hdfsKerberosPrincipal
is specified but hdfsKerberosKeytabPath is blank!");
+
+ if (isBlank(hdfsKerberosPrincipal) && isNotBlank(hdfsKerberosKeytabPath))
+ throw new IllegalArgumentException("The property hdfsKerberosKeytabPath
is specified but hdfsKerberosPrincipal is blank!");
+ }
+}
diff --git
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
index 469326fb..ff486738 100644
---
a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
+++
b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.infra.job.archive;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -25,31 +27,59 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
public class HdfsUploader extends AbstractFileAction {
+ private static final Logger LOG =
LoggerFactory.getLogger(HdfsUploader.class);
- private static final String DEFAULT_FILE_PERMISSION = "640";
private final Configuration configuration;
- private final Path destinationDirectory;
- private final FsPermission fsPermission;
+ private final HdfsProperties properties;
- public HdfsUploader(Configuration configuration, Path destinationDirectory,
FsPermission fsPermission) {
- this.destinationDirectory = destinationDirectory;
+ public HdfsUploader(Configuration configuration, HdfsProperties properties) {
+ this.properties = properties;
this.configuration = configuration;
- this.fsPermission = fsPermission == null ? new
FsPermission(DEFAULT_FILE_PERMISSION) : fsPermission;
+
+ if (new ClassPathResource("core-site.xml").exists()) {
+ LOG.info("Hdfs core-site.xml is found in the classpath.");
+ }
+ else {
+ LOG.warn("Hdfs core-site.xml is not found in the classpath. Using
defaults.");
+ }
+ if (new ClassPathResource("hdfs-site.xml").exists()) {
+ LOG.info("Hdfs hdfs-site.xml is found in the classpath.");
+ }
+ else {
+ LOG.warn("Hdfs hdfs-site.xml is not found in the classpath. Using
defaults.");
+ }
+ if (isNotBlank(properties.getHdfsEndpoint())) {
+ LOG.info("Hdfs endpoint is defined in Infra Manager properties. Setting
fs.defaultFS to {}", properties.getHdfsEndpoint());
+ this.configuration.set("fs.defaultFS", properties.getHdfsEndpoint());
+ }
+
+ UserGroupInformation.setConfiguration(configuration);
}
@Override
protected File onPerform(File inputFile) {
+ try {
+ if
("kerberos".equalsIgnoreCase(configuration.get("hadoop.security.authentication")))
+
UserGroupInformation.loginUserFromKeytab(properties.getHdfsKerberosPrincipal(),
properties.getHdfsKerberosKeytabPath());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
try (FileSystem fileSystem = FileSystem.get(configuration)) {
- Path destination = new Path(destinationDirectory, inputFile.getName());
+
+ Path destination = new Path(properties.getHdfsDestinationDirectory(),
inputFile.getName());
if (fileSystem.exists(destination)) {
throw new UnsupportedOperationException(String.format("File '%s'
already exists!", destination));
}
fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()),
destination);
- fileSystem.setPermission(destination, fsPermission);
+ fileSystem.setPermission(destination,
properties.getHdfsFilePermission());
return inputFile;
}
diff --git a/pom.xml b/pom.xml
index 29271c16..b6f52f9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
<deb.python.ver>python (>= 2.6)</deb.python.ver>
<deb.architecture>amd64</deb.architecture>
<deb.dependency.list>${deb.python.ver}</deb.dependency.list>
- <hadoop.version>3.0.0</hadoop.version>
+ <hadoop.version>3.1.1</hadoop.version>
<surefire.argLine>-Xmx1024m -Xms512m</surefire.argLine>
<zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
<ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Infra Manager: kerberos support
> -------------------------------
>
> Key: AMBARI-24878
> URL: https://issues.apache.org/jira/browse/AMBARI-24878
> Project: Ambari
> Issue Type: Improvement
> Components: ambari-infra
> Affects Versions: 2.8.0
> Reporter: Krisztian Kasa
> Assignee: Krisztian Kasa
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.8.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)