[
https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mario Georgiev updated FLINK-11429:
-----------------------------------
Description:
Hello,
Problem is, if i put the core-site.xml somewhere and add it in the flink image,
put the path to it in the flink-conf.yaml it does not get picked and i get an
exception
{code:java}
Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS
Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
load credentials from service endpoint
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 31 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service endpoint
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
... 48 more
Caused by: java.net.SocketException: Network unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
{code}
However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the
Dockerfile, they get picked and it works. Why is it disregarding the
core-site.xml? Even if i don't copy the core-site.xml it works only with the
ENV variables.
It looks it is on the classpath, but entirely disregarded
{code:java}
- Classpath:
/opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
{code:java}
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>AWS access key ID.
Omit for IAM role-based or provider-based
authentication.</description>
<value><hidden></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>AWS secret key.
Omit for IAM role-based or provider-based
authentication.</description>
<value><hidden></value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
</configuration>
{code}
I am building the kubernetes standalone image as following :
Dockerfile :
{code:java}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
FROM openjdk:8-jre-alpine
# Install requirements
# Modification to original Dockerfile to support rocksdb
# RUN apk add --no-cache bash snappy
# This is a fix for RocksDB compatibility
# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin
ENV FLINK_CONF $FLINK_HOME/conf
ENV FLINK_OPT $FLINK_HOME/opt
ENV FLINK_HADOOP_CONF /hadoop/conf
# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET
# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar
RUN set -x && \
ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
chown -h flink:flink $FLINK_HOME
# Modification to original Dockerfile
RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> $FLINK_CONF/flink-conf.yaml
RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
COPY docker-entrypoint.sh /
RUN chmod +x docker-entrypoint.sh
RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar
http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
#Modification to original Dockerfile
USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]
{code}
{code:java}
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class WordCount {
public static void main (String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
String words1[] = new String[]{
"football",
"soccer",
"billiards",
"snooker",
"tennis",
"handball",
"basketball"
};
List<String> words = new ArrayList<>();
Random rnd = new Random();
for (int i =0 ; i < 500000;i++) {
words.add(words1[rnd.nextInt(words1.length-1)]);
}
DataStreamSource<String> src = env.fromElements(words.toArray(new
String[]{}));
src.map(str -> str.toLowerCase())
.flatMap(new Splitter())
.returns(TypeInformation.of(new
TypeHint<Tuple2<String,Integer>>(){}))
.keyBy(0)
.sum(1)
.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>>
out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
{code}
Job manger kubernetes args :
It is a template, so disregard the placeholders
{code:java}
"job-cluster",
"--job-classname", "{classname}",
"-Djobmanager.rpc.address={cluster.name}-jobmanager",
"-Dparallelism.default=2",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125",
"-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
"-Dstate.backend=rocksdb",
"-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
"-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
"-Dstate.backend.incremental=true"
{code}
Task manager kubernetes args:
Again, templated
{code:java}
["task-manager",
"-Djobmanager.rpc.address={cluster.name}-jobmanager",
"-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
"-Dstate.backend=rocksdb",
"-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
"-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
"-Dstate.backend.incremental=true"]
{code}
was:
Hello,
Problem is, if i put the core-site.xml somewhere and add it in the flink image,
put the path to it in the flink-conf.yaml it does not get picked and i get an
exception
{code:java}
Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS
Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
load credentials from service endpoint
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 31 more
Caused by: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service endpoint
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
... 48 more
Caused by: java.net.SocketException: Network unreachable (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
at
org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
{code}
However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the
Dockerfile, they get picked and it works. Why is it disregarding the
core-site.xml? Even if i don't copy the core-site.xml it works only with the
ENV variables.
{code:java}
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<description>AWS access key ID.
Omit for IAM role-based or provider-based
authentication.</description>
<value><hidden></value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<description>AWS secret key.
Omit for IAM role-based or provider-based
authentication.</description>
<value><hidden></value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
</configuration>
{code}
I am building the kubernetes standalone image as following :
Dockerfile :
{code:java}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
FROM openjdk:8-jre-alpine
# Install requirements
# Modification to original Dockerfile to support rocksdb
# RUN apk add --no-cache bash snappy
# This is a fix for RocksDB compatibility
# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin
ENV FLINK_CONF $FLINK_HOME/conf
ENV FLINK_OPT $FLINK_HOME/opt
ENV FLINK_HADOOP_CONF /hadoop/conf
# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET
# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar
RUN set -x && \
ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
chown -h flink:flink $FLINK_HOME
# Modification to original Dockerfile
RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> $FLINK_CONF/flink-conf.yaml
RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
COPY docker-entrypoint.sh /
RUN chmod +x docker-entrypoint.sh
RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
#Transitive Dependency of aws-java-sdk-s3
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar
http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar
http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar
http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar
http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
#Modification to original Dockerfile
USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]
{code}
{code:java}
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class WordCount {
public static void main (String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
String words1[] = new String[]{
"football",
"soccer",
"billiards",
"snooker",
"tennis",
"handball",
"basketball"
};
List<String> words = new ArrayList<>();
Random rnd = new Random();
for (int i =0 ; i < 500000;i++) {
words.add(words1[rnd.nextInt(words1.length-1)]);
}
DataStreamSource<String> src = env.fromElements(words.toArray(new
String[]{}));
src.map(str -> str.toLowerCase())
.flatMap(new Splitter())
.returns(TypeInformation.of(new
TypeHint<Tuple2<String,Integer>>(){}))
.keyBy(0)
.sum(1)
.print();
env.execute();
}
public static class Splitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>>
out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
{code}
Job manger kubernetes args :
It is a template, so disregard the placeholders
{code:java}
"job-cluster",
"--job-classname", "{classname}",
"-Djobmanager.rpc.address={cluster.name}-jobmanager",
"-Dparallelism.default=2",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125",
"-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
"-Dstate.backend=rocksdb",
"-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
"-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
"-Dstate.backend.incremental=true"
{code}
Task manager kubernetes args:
Again, templated
{code:java}
["task-manager",
"-Djobmanager.rpc.address={cluster.name}-jobmanager",
"-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
"-Dstate.backend=rocksdb",
"-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
"-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
"-Dstate.backend.incremental=true"]
{code}
> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
> Key: FLINK-11429
> URL: https://issues.apache.org/jira/browse/FLINK-11429
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.7.1
> Reporter: Mario Georgiev
> Priority: Critical
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink
> image, put the path to it in the flink-conf.yaml it does not get picked and i
> get an exception
> {code:java}
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS
> Credentials provided by BasicAWSCredentialsProvider
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by:
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to
> load credentials from service endpoint
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the
> Dockerfile, they get picked and it works. Why is it disregarding the
> core-site.xml? Even if i don't copy the core-site.xml it works only with the
> ENV variables.
> It looks it is on the classpath, but entirely disregarded
>
> {code:java}
> - Classpath:
> /opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
>
> {code:java}
> <configuration>
> <property>
> <name>fs.s3.impl</name>
> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
> </property>
> <!-- Comma separated list of local directories used to buffer
> large results prior to transmitting them to S3. -->
> <property>
> <name>fs.s3a.buffer.dir</name>
> <value>/tmp</value>
> </property>
> <property>
> <name>fs.s3a.access.key</name>
> <description>AWS access key ID.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.secret.key</name>
> <description>AWS secret key.
> Omit for IAM role-based or provider-based
> authentication.</description>
> <value><hidden></value>
> </property>
> <property>
> <name>fs.s3a.aws.credentials.provider</name>
> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
> </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile :
> {code:java}
> ################################################################################
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> ENV FLINK_OPT $FLINK_HOME/opt
> ENV FLINK_HADOOP_CONF /hadoop/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
> ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
> ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
> addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
> chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
> chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
> ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
> RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >>
> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
> RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar
> http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>
>
>
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
> public static void main (String[] args) throws Exception{
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5000L);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> String words1[] = new String[]{
> "football",
> "soccer",
> "billiards",
> "snooker",
> "tennis",
> "handball",
> "basketball"
> };
> List<String> words = new ArrayList<>();
> Random rnd = new Random();
> for (int i =0 ; i < 500000;i++) {
> words.add(words1[rnd.nextInt(words1.length-1)]);
> }
> DataStreamSource<String> src = env.fromElements(words.toArray(new
> String[]{}));
> src.map(str -> str.toLowerCase())
> .flatMap(new Splitter())
> .returns(TypeInformation.of(new
> TypeHint<Tuple2<String,Integer>>(){}))
> .keyBy(0)
> .sum(1)
> .print();
> env.execute();
> }
> public static class Splitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
> @Override
> public void flatMap(String sentence, Collector<Tuple2<String,
> Integer>> out) throws Exception {
> for (String word : sentence.split(" ")) {
> out.collect(new Tuple2<String, Integer>(word, 1));
> }
> }
> }
> }
> {code}
> Job manger kubernetes args :
> It is a template, so disregard the placeholders
> {code:java}
> "job-cluster",
> "--job-classname", "{classname}",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dparallelism.default=2",
> "-Dblob.server.port=6124",
> "-Dqueryable-state.server.ports=6125",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args:
> Again, templated
> {code:java}
> ["task-manager",
> "-Djobmanager.rpc.address={cluster.name}-jobmanager",
> "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
> "-Dstate.backend=rocksdb",
> "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
> "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
> "-Dstate.backend.incremental=true"]
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)