[ 
https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Georgiev updated FLINK-11429:
-----------------------------------
    Description: 
Hello,

Problem is, if i put the core-site.xml somewhere and add it in the flink image, 
put the path to it in the flink-conf.yaml it does not get picked and i get an 
exception 
{code:java}
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS 
Credentials provided by BasicAWSCredentialsProvider 
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
load credentials from service endpoint

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)

... 31 more

Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: 
Unable to load credentials from service endpoint

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)

... 48 more

Caused by: java.net.SocketException: Network unreachable (connect failed)

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at sun.net.NetworkClient.doConnect(NetworkClient.java:175)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)

at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)

at sun.net.www.http.HttpClient.New(HttpClient.java:339)

at sun.net.www.http.HttpClient.New(HttpClient.java:357)

at 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)

at 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
{code}
However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the 
Dockerfile, they get picked and it works. Why is it disregarding the 
core-site.xml? Even if i don't copy the core-site.xml it works only with the 
ENV variables.

It looks it is on the classpath, but entirely disregarded

 
{code:java}
    -  Classpath: 
/opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
 
{code:java}
<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>AWS access key ID.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>AWS secret key.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>


    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
    </property>


</configuration>
{code}
I am building the kubernetes standalone image as following :

Dockerfile : 
{code:java}
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

FROM openjdk:8-jre-alpine

# Install requirements
# Modification to original Dockerfile to support rocksdb
# RUN apk add --no-cache bash snappy
# This is a fix for RocksDB compatibility


# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin
ENV FLINK_CONF $FLINK_HOME/conf
ENV FLINK_OPT $FLINK_HOME/opt
ENV FLINK_HADOOP_CONF /hadoop/conf

# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET

# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar

RUN set -x && \
  ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
  ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
  addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
  chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
  chown -h flink:flink $FLINK_HOME

# Modification to original Dockerfile
RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'

COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF

RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> $FLINK_CONF/flink-conf.yaml

RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml

RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml

RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR

COPY docker-entrypoint.sh /

RUN chmod +x docker-entrypoint.sh

RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar 
http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
#Modification to original Dockerfile

USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]
{code}
 

 

 
{code:java}
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WordCount {



    public static void main (String[] args) throws Exception{
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000L);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        String words1[] = new String[]{
                "football",
                "soccer",
                "billiards",
                "snooker",
                "tennis",
                "handball",
                "basketball"
        };

        List<String> words = new ArrayList<>();
        Random rnd = new Random();
        for (int i =0 ; i < 500000;i++) {
            words.add(words1[rnd.nextInt(words1.length-1)]);
        }

        DataStreamSource<String> src = env.fromElements(words.toArray(new 
String[]{}));


        src.map(str -> str.toLowerCase())
                .flatMap(new Splitter())
                .returns(TypeInformation.of(new 
TypeHint<Tuple2<String,Integer>>(){}))
                .keyBy(0)
                .sum(1)
                .print();

        env.execute();
    }

    public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> 
out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

{code}
Job manger kubernetes args :
 It is a template, so disregard the placeholders
{code:java}
       "job-cluster",
       "--job-classname", "{classname}",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dparallelism.default=2",
       "-Dblob.server.port=6124",
       "-Dqueryable-state.server.ports=6125",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"
{code}
Task manager kubernetes args: 
 Again, templated
{code:java}
       ["task-manager",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"]
{code}
 

 

  was:
Hello,

Problem is, if i put the core-site.xml somewhere and add it in the flink image, 
put the path to it in the flink-conf.yaml it does not get picked and i get an 
exception 
{code:java}
Caused by: 
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS 
Credentials provided by BasicAWSCredentialsProvider 
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
load credentials from service endpoint

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)

... 31 more

Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: 
Unable to load credentials from service endpoint

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)

at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)

... 48 more

Caused by: java.net.SocketException: Network unreachable (connect failed)

at java.net.PlainSocketImpl.socketConnect(Native Method)

at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

at java.net.Socket.connect(Socket.java:589)

at sun.net.NetworkClient.doConnect(NetworkClient.java:175)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)

at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)

at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)

at sun.net.www.http.HttpClient.New(HttpClient.java:339)

at sun.net.www.http.HttpClient.New(HttpClient.java:357)

at 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)

at 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)

at 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)

at 
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
{code}
However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the 
Dockerfile, they get picked and it works. Why is it disregarding the 
core-site.xml? Even if i don't copy the core-site.xml it works only with the 
ENV variables.
{code:java}
<configuration>

    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <property>
        <name>fs.s3a.access.key</name>
        <description>AWS access key ID.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <description>AWS secret key.
            Omit for IAM role-based or provider-based 
authentication.</description>
        <value><hidden></value>
    </property>


    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
    </property>


</configuration>
{code}
I am building the kubernetes standalone image as following :

Dockerfile : 
{code:java}
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

FROM openjdk:8-jre-alpine

# Install requirements
# Modification to original Dockerfile to support rocksdb
# RUN apk add --no-cache bash snappy
# This is a fix for RocksDB compatibility


# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin
ENV FLINK_CONF $FLINK_HOME/conf
ENV FLINK_OPT $FLINK_HOME/opt
ENV FLINK_HADOOP_CONF /hadoop/conf

# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET

# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar

RUN set -x && \
  ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
  ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
  addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
  chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
  chown -h flink:flink $FLINK_HOME

# Modification to original Dockerfile
RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'

COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF

RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> $FLINK_CONF/flink-conf.yaml

RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml

RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml

RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR

COPY docker-entrypoint.sh /

RUN chmod +x docker-entrypoint.sh

RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar 
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar 
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar 
http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar 
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
#Modification to original Dockerfile

USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]
{code}
 

 

 
{code:java}
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class WordCount {



    public static void main (String[] args) throws Exception{
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000L);

        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        String words1[] = new String[]{
                "football",
                "soccer",
                "billiards",
                "snooker",
                "tennis",
                "handball",
                "basketball"
        };

        List<String> words = new ArrayList<>();
        Random rnd = new Random();
        for (int i =0 ; i < 500000;i++) {
            words.add(words1[rnd.nextInt(words1.length-1)]);
        }

        DataStreamSource<String> src = env.fromElements(words.toArray(new 
String[]{}));


        src.map(str -> str.toLowerCase())
                .flatMap(new Splitter())
                .returns(TypeInformation.of(new 
TypeHint<Tuple2<String,Integer>>(){}))
                .keyBy(0)
                .sum(1)
                .print();

        env.execute();
    }

    public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> 
out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

{code}
Job manger kubernetes args :
 It is a template, so disregard the placeholders
{code:java}
       "job-cluster",
       "--job-classname", "{classname}",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dparallelism.default=2",
       "-Dblob.server.port=6124",
       "-Dqueryable-state.server.ports=6125",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"
{code}
Task manager kubernetes args: 
 Again, templated
{code:java}
       ["task-manager",
       "-Djobmanager.rpc.address={cluster.name}-jobmanager",
       "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
       "-Dstate.backend=rocksdb",
       "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
       "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
       "-Dstate.backend.incremental=true"]
{code}
 

 


> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
>                 Key: FLINK-11429
>                 URL: https://issues.apache.org/jira/browse/FLINK-11429
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.7.1
>            Reporter:  Mario Georgiev
>            Priority: Critical
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink 
> image, put the path to it in the flink-conf.yaml it does not get picked and i 
> get an exception 
> {code:java}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS 
> Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
> load credentials from service endpoint
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
> load credentials from service endpoint
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at 
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at 
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at 
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at 
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the 
> Dockerfile, they get picked and it works. Why is it disregarding the 
> core-site.xml? Even if i don't copy the core-site.xml it works only with the 
> ENV variables.
> It looks it is on the classpath, but entirely disregarded
>  
> {code:java}
>     -  Classpath: 
> /opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
>  
> {code:java}
> <configuration>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>     </property>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>     <property>
>         <name>fs.s3a.access.key</name>
>         <description>AWS access key ID.
>             Omit for IAM role-based or provider-based 
> authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.secret.key</name>
>         <description>AWS secret key.
>             Omit for IAM role-based or provider-based 
> authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.aws.credentials.provider</name>
>         <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
>     </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile : 
> {code:java}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> ENV FLINK_OPT $FLINK_HOME/opt
> ENV FLINK_HADOOP_CONF /hadoop/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
>   ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
>   ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
>   addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
>   chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
>   chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
> ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
> RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> 
> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
> RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar 
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar 
> http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>  
>  
>  
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
>     public static void main (String[] args) throws Exception{
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000L);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>         
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         String words1[] = new String[]{
>                 "football",
>                 "soccer",
>                 "billiards",
>                 "snooker",
>                 "tennis",
>                 "handball",
>                 "basketball"
>         };
>         List<String> words = new ArrayList<>();
>         Random rnd = new Random();
>         for (int i =0 ; i < 500000;i++) {
>             words.add(words1[rnd.nextInt(words1.length-1)]);
>         }
>         DataStreamSource<String> src = env.fromElements(words.toArray(new 
> String[]{}));
>         src.map(str -> str.toLowerCase())
>                 .flatMap(new Splitter())
>                 .returns(TypeInformation.of(new 
> TypeHint<Tuple2<String,Integer>>(){}))
>                 .keyBy(0)
>                 .sum(1)
>                 .print();
>         env.execute();
>     }
>     public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>             for (String word : sentence.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
> }
> {code}
> Job manger kubernetes args :
>  It is a template, so disregard the placeholders
> {code:java}
>        "job-cluster",
>        "--job-classname", "{classname}",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dparallelism.default=2",
>        "-Dblob.server.port=6124",
>        "-Dqueryable-state.server.ports=6125",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args: 
>  Again, templated
> {code:java}
>        ["task-manager",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to