Mario Georgiev created FLINK-11429:
---------------------------------------

             Summary: Flink fails to authenticate s3a with core-site.xml
                 Key: FLINK-11429
                 URL: https://issues.apache.org/jira/browse/FLINK-11429
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.7.1
            Reporter:  Mario Georgiev


Hello,

Problem is, if i put the core-site.xml somewhere and add it in the flink image, 
put the path to it in the flink-conf.yaml it does not get picked and i get an 
exception 


{code:java}
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS 
Credentials provided by BasicAWSCredentialsProvider 
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
load credentials from service endpoint

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)

... 31 more

Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: 
Unable to load credentials from service endpoint

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)

... 48 more

Caused by: java.net.SocketException: Network unreachable (connect failed)

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at sun.net.NetworkClient.doConnect(NetworkClient.java:175)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)

at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)

at sun.net.www.http.HttpClient.New(HttpClient.java:339)

at sun.net.www.http.HttpClient.New(HttpClient.java:357)

at 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)

at 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
{code}

However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the 
Dockerfile, they get picked and it works. Why is it disregarding the 
core-site.xml?


{code:java}
<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>AWS access key ID.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>AWS secret key.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>


    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
    </property>


</configuration>
{code}




I am building the kubernetes standalone image as following :

Dockerfile : 


{code:java}
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

FROM openjdk:8-jre-alpine

# Install requirements
# Modification to original Dockerfile to support rocksdb
# RUN apk add --no-cache bash snappy
# This is a fix for RocksDB compatibility


# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin
ENV FLINK_CONF $FLINK_HOME/conf

# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET

# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar

RUN set -x && \
  ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
  ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
  addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
  chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
  chown -h flink:flink $FLINK_HOME

# Modification to original Dockerfile
RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'

COPY core-site.xml /etc/hadoop/conf/core-site.xml

RUN echo "fs.hdfs.hadoopconf: /etc/hadoop/conf" >> $FLINK_CONF/flink-conf.yaml

COPY docker-entrypoint.sh /

RUN chmod +x docker-entrypoint.sh

RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.7.3.jar 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.11.183.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.183/aws-java-sdk-s3-1.11.183.jar
RUN wget -O $FLINK_LIB_DIR/flink-s3-fs-hadoop-1.7.1.jar 
http://central.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.7.1/flink-s3-fs-hadoop-1.7.1.jar

#Transitive Dependency of aws-java-sdk-s3
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.183.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.183/aws-java-sdk-core-1.11.183.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.183.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.183/aws-java-sdk-kms-1.11.183.jar
RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.6.7.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar
RUN wget -O $FLINK_LIB_DIR/jackson-core-2.6.7.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar
RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.6.7.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.6.7/jackson-databind-2.6.7.jar
RUN wget -O $FLINK_LIB_DIR/joda-time-2.8.1.jar 
http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.4.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar
RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.3.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar
#Modification to original Dockerfile

USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]
{code}
 

 

 
{code:java}
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WordCount {



    public static void main (String[] args) throws Exception{
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000L);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        String words1[] = new String[]{
                "football",
                "soccer",
                "billiards",
                "snooker",
                "tennis",
                "handball",
                "basketball"
        };

        List<String> words = new ArrayList<>();
        Random rnd = new Random();
        for (int i =0 ; i < 500000;i++) {
            words.add(words1[rnd.nextInt(words1.length-1)]);
        }

        DataStreamSource<String> src = env.fromElements(words.toArray(new 
String[]{}));


        src.map(str -> str.toLowerCase())
                .flatMap(new Splitter())
                .returns(TypeInformation.of(new 
TypeHint<Tuple2<String,Integer>>(){}))
                .keyBy(0)
                .sum(1)
                .print();

        env.execute();
    }

    public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> 
out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

{code}
Job manger kubernetes args :
It is a template, so disregard the placeholders


{code:java}
       "job-cluster",
       "--job-classname", "{classname}",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dparallelism.default=2",
       "-Dblob.server.port=6124",
       "-Dqueryable-state.server.ports=6125",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"
{code}
Task manager kubernetes args: 
Again, templated



{code:java}
       ["task-manager",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"]
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to