[
https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16757031#comment-16757031
]
Aljoscha Krettek commented on FLINK-11429:
------------------------------------------
Thanks for keeping us posted!
> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
> Key: FLINK-11429
> URL: https://issues.apache.org/jira/browse/FLINK-11429
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.7.1
> Reporter: Mario Georgiev
> Priority: Minor
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink
> image, put the path to it in the flink-conf.yaml it does not get picked and i
> get an exception
> {code:java}
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS
> Credentials provided by BasicAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the
> Dockerfile, they get picked and it works. Why is it disregarding the
> core-site.xml? Even if i don't copy the core-site.xml it works only with the
> ENV variables.
> It looks it is on the classpath, but entirely disregarded
>
> {code:java}
> - Classpath:
> /opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
>
> {code:java}
> <configuration>
> <property>
> <name>fs.s3.impl</name>
> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
> </property>
> <!-- Comma separated list of local directories used to buffer
> large results prior to transmitting them to S3. -->
> <property>
> <name>fs.s3a.buffer.dir</name>
> <value>/tmp</value>
> </property>
> <property>
> <name>fs.s3a.access.key</name>
> <description>AWS access key ID.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.secret.key</name>
> <description>AWS secret key.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.aws.credentials.provider</name>
> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
> </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile :
> {code:java}
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> ENV FLINK_OPT $FLINK_HOME/opt
> ENV FLINK_HADOOP_CONF /hadoop/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
> ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
> ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
> addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
> chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
> chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
> ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
> RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >>
> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
> RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar
> http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>
>
>
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
> public static void main (String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5000L);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> String words1[] = new String[]{
> "football",
> "soccer",
> "billiards",
> "snooker",
> "tennis",
> "handball",
> "basketball"
> };
> List<String> words = new ArrayList<>();
> Random rnd = new Random();
> for (int i =0 ; i < 500000;i++) {
> words.add(words1[rnd.nextInt(words1.length-1)]);
> }
> DataStreamSource<String> src = env.fromElements(words.toArray(new
> String[]{}));
> src.map(str -> str.toLowerCase())
> .flatMap(new Splitter())
> .returns(TypeInformation.of(new
> TypeHint<Tuple2<String,Integer>>(){}))
> .keyBy(0)
> .sum(1)
> .print();
> env.execute();
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
> }
> {code}
> Job manger kubernetes args :
> It is a template, so disregard the placeholders
> {code:java}
> "job-cluster",
> "--job-classname", "{classname}",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dparallelism.default=2",
> "-Dblob.server.port=6124",
> "-Dqueryable-state.server.ports=6125",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args:
> Again, templated
> {code:java}
> ["task-manager",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"]
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)