[
https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752090#comment-16752090
]
Mario Georgiev commented on FLINK-11429:
-----------------------------------------
Tried using flink's hadoop2.8 binary and providing the appropriate lib
dependencies for hadoop, but still to no avail.
Putting the S3 secret key and access key in ENV variables works, while
core-site is completely ignored no matter where i put it.
> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
> Key: FLINK-11429
> URL: https://issues.apache.org/jira/browse/FLINK-11429
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.7.1
> Reporter: Mario Georgiev
> Priority: Critical
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink
> image, put the path to it in the flink-conf.yaml it does not get picked and i
> get an exception
> {code:java}
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS
> Credentials provided by BasicAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the
> Dockerfile, they get picked and it works. Why is it disregarding the
> core-site.xml? Even if i don't copy the core-site.xml it works only with the
> ENV variables.
> {code:java}
> <configuration>
> <property>
> <name>fs.s3.impl</name>
> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
> </property>
> <!-- Comma separated list of local directories used to buffer
> large results prior to transmitting them to S3. -->
> <property>
> <name>fs.s3a.buffer.dir</name>
> <value>/tmp</value>
> </property>
> <property>
> <name>fs.s3a.access.key</name>
> <description>AWS access key ID.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.secret.key</name>
> <description>AWS secret key.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.aws.credentials.provider</name>
> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
> </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile :
> {code:java}
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
> ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
> ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
> addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
> chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
> chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml /etc/hadoop/conf/core-site.xml
> RUN echo "fs.hdfs.hadoopconf: /etc/hadoop/conf" >> $FLINK_CONF/flink-conf.yaml
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.7.3.jar
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.3.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.11.183.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.183/aws-java-sdk-s3-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/flink-s3-fs-hadoop-1.7.1.jar
> http://central.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.7.1/flink-s3-fs-hadoop-1.7.1.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.183.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.183/aws-java-sdk-core-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.183.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.183/aws-java-sdk-kms-1.11.183.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.6.7.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.6.7.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.6.7.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.6.7/jackson-databind-2.6.7.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.8.1.jar
> http://central.maven.org/maven2/joda-time/joda-time/2.8.1/joda-time-2.8.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.4.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.3.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>
>
>
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
> public static void main (String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5000L);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> String words1[] = new String[]{
> "football",
> "soccer",
> "billiards",
> "snooker",
> "tennis",
> "handball",
> "basketball"
> };
> List<String> words = new ArrayList<>();
> Random rnd = new Random();
> for (int i =0 ; i < 500000;i++) {
> words.add(words1[rnd.nextInt(words1.length-1)]);
> }
> DataStreamSource<String> src = env.fromElements(words.toArray(new
> String[]{}));
> src.map(str -> str.toLowerCase())
> .flatMap(new Splitter())
> .returns(TypeInformation.of(new
> TypeHint<Tuple2<String,Integer>>(){}))
> .keyBy(0)
> .sum(1)
> .print();
> env.execute();
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
> }
> {code}
> Job manger kubernetes args :
> It is a template, so disregard the placeholders
> {code:java}
> "job-cluster",
> "--job-classname", "{classname}",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dparallelism.default=2",
> "-Dblob.server.port=6124",
> "-Dqueryable-state.server.ports=6125",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args:
> Again, templated
> {code:java}
> ["task-manager",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"]
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)