[
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371403#comment-16371403
]
chris snow edited comment on FLINK-8543 at 2/21/18 1:33 PM:
------------------------------------------------------------
Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
S3AFileSystem fs,
String key,
Progressable progress)
throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
}
// ** print extra debug output **
void printStackTrace() {
long threadId = Thread.currentThread().getId();
StringBuilder sb = new StringBuilder();
sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
sb.append("\n " + ste);
}
// I'm being lazy - log the stacktrace as an error so it will get logged
without having to
// change the logger configuration
LOG.error(sb.toString());
}
/**
* Check for the filesystem being open.
* @throws IOException if the filesystem is closed.
*/
void checkOpen() throws IOException {
if (closed.get()) {
printStackTrace();
throw new IOException(
"Output Stream closed. Thread id: " +
Thread.currentThread().getId() + " key: " + key);
}
}
@Override
public void flush() throws IOException {
checkOpen();
backupStream.flush();
}
@Override
public void close() throws IOException {
printStackTrace();
if (closed.getAndSet(true)) {
return;
}
backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
try {
final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
Upload upload = fs.putObject(
fs.newPutObjectRequest(
key,
om,
backupFile));
ProgressableProgressListener listener =
new ProgressableProgressListener(fs, key, upload, progress);
upload.addProgressListener(listener);
upload.waitForUploadResult();
listener.uploadCompleted();
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (AmazonClientException e) {
throw translateException("saving output", key , e);
} finally {
if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", key);
}
@Override
public void write(int b) throws IOException {
checkOpen();
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
backupStream.write(b, off, len);
}
}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO org.apache.flink.yarn.YarnTaskManagerRunner
-
--------------------------------------------------------------------------------
2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2,
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- OS current user: clsadmin
2018-02-21 12:43:12,162 WARN org.apache.hadoop.util.NativeCodeLoader
- Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation -
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Hadoop version: 2.7.3.2.6.2.0-205
...
omitted for brevity
...
2018-02-21 12:43:12,267 INFO org.apache.flink.yarn.YarnTaskManagerRunner
-
--------------------------------------------------------------------------------
2018-02-21 12:43:12,270 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Registered UNIX signal handlers for [TERM, HUP, INT]
2018-02-21 12:43:12,705 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Loading configuration from .
...
omitted for brevity
...
property: taskmanager.memory.preallocate, false
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current working/local Directory:
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current working Directory:
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- TM: remoteKeytabPath obtained null
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- TM: remoteKeytabPrincipal obtained null
2018-02-21 12:43:12,779 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Setting directories for temporary file
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,780 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- YARN daemon is running as: clsadmin Yarn client user obtainer:
clsadmin
2018-02-21 12:43:12,781 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- ResourceID assigned for this container:
container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,857 INFO
org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set
to clsadmin (auth:SIMPLE)
2018-02-21 12:43:13,007 INFO
org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to
select the network interface and address to use by connecting to the leading
JobManager.
2018-02-21 12:43:13,008 INFO
org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager
will try to connect for 10000 milliseconds before falling back to heuristics
2018-02-21 12:43:13,012 INFO org.apache.flink.runtime.net.ConnectionUtils
- Retrieved new target address
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:38012.
2018-02-21 12:43:13,032 INFO org.apache.flink.runtime.taskmanager.TaskManager
- TaskManager will use hostname/address
'chs-van-484-dn001.bi.services.us-south.bluemix.net' (172.16.16.130) for
communication.
2018-02-21 12:43:13,042 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager
2018-02-21 12:43:13,043 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor system at
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907.
2018-02-21 12:43:13,045 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Trying to start actor system at
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907
2018-02-21 12:43:13,977 INFO akka.event.slf4j.Slf4jLogger
- Slf4jLogger started
2018-02-21 12:43:14,145 INFO akka.remote.Remoting
- Starting remoting
2018-02-21 12:43:14,504 INFO akka.remote.Remoting
- Remoting started; listening on addresses
:[akka.tcp://[email protected]:38907]
2018-02-21 12:43:14,522 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Actor system started at
akka.tcp://[email protected]:38907
2018-02-21 12:43:14,539 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics
reporter configured, no metrics will be exposed/reported.
2018-02-21 12:43:14,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor
2018-02-21 12:43:14,552 INFO
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig
[server address:
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130, server port:
0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO,
number of server threads: 1 (manual), number of client threads: 1 (manual),
server connect backlog: 0 (use Netty's default), client connect timeout (sec):
120, send/receive buffer size (bytes): 0 (use Netty's default)]
2018-02-21 12:43:14,559 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have
a max timeout of 10000 ms
2018-02-21 12:43:14,568 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file
directory
'/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,568 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file
directory
'/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,649 INFO
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB
for network buffer pool (number of memory segments: 2048, bytes per segment:
32768).
2018-02-21 12:43:14,878 WARN
org.apache.flink.runtime.query.QueryableStateUtils - Could not load
Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is
not in the classpath. Please put the corresponding jar from the opt to the lib
folder.
2018-02-21 12:43:14,879 WARN
org.apache.flink.runtime.query.QueryableStateUtils - Could not load
Queryable State Server. Probable reason: flink-queryable-state-runtime is not
in the classpath. Please put the corresponding jar from the opt to the lib
folder.
2018-02-21 12:43:14,881 INFO
org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the
network environment and its components.
2018-02-21 12:43:14,965 INFO
org.apache.flink.runtime.io.network.netty.NettyClient - Successful
initialization (took 69 ms).
2018-02-21 12:43:15,049 INFO
org.apache.flink.runtime.io.network.netty.NettyServer - Successful
initialization (took 84 ms). Listening on SocketAddress /172.16.16.130:33982.
2018-02-21 12:43:15,135 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting
managed memory to 0.7 of the currently free heap space (234 MB), memory will be
allocated lazily.
2018-02-21 12:43:15,148 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-9dbfdfda-909a-45da-aab7-d82d826edf4b
for spill files.
2018-02-21 12:43:15,148 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-d4a1da3f-22a4-4a62-9589-050c2343dc59
for spill files.
2018-02-21 12:43:15,156 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-80d88a48-c391-44ec-b61d-cb293b20fe5a
2018-02-21 12:43:15,156 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-58fa3a64-ae94-44c9-b491-98c5dd89d426
2018-02-21 12:43:15,319 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-e6f21f36-1929-4e9c-a6e6-9225907dc7e4
2018-02-21 12:43:15,320 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-47bc021d-52c9-4f35-abe1-dd9024f08c8f
2018-02-21 12:43:15,344 INFO org.apache.flink.yarn.YarnTaskManager
- Starting TaskManager actor at
akka://flink/user/taskmanager#-1252244271.
2018-02-21 12:43:15,345 INFO org.apache.flink.yarn.YarnTaskManager
- TaskManager data connection information:
container_e01_1519207944666_0008_01_000002 @
chs-van-484-dn001.bi.services.us-south.bluemix.net (dataPort=33982)
2018-02-21 12:43:15,346 INFO org.apache.flink.yarn.YarnTaskManager
- TaskManager has 1 task slot(s).
2018-02-21 12:43:15,348 INFO org.apache.flink.yarn.YarnTaskManager
- Memory usage stats: [HEAP: 77/406/406 MB, NON HEAP: 35/36/-1 MB
(used/committed/max)]
2018-02-21 12:43:15,363 INFO org.apache.flink.yarn.YarnTaskManager
- Trying to register at JobManager
akka.tcp://[email protected]:38012/user/jobmanager
(attempt 1, timeout: 500 milliseconds)
2018-02-21 12:43:15,711 INFO org.apache.flink.yarn.YarnTaskManager
- Successful registration at JobManager
(akka.tcp://[email protected]:38012/user/jobmanager),
starting network stack and library cache.
2018-02-21 12:43:15,723 INFO org.apache.flink.yarn.YarnTaskManager
- Determined BLOB server address to be
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:41971.
Starting BLOB cache.
2018-02-21 12:43:15,734 INFO org.apache.flink.runtime.blob.PermanentBlobCache
- Created BLOB cache storage directory
/tmp/blobStore-cd4d61de-14b1-454e-9cd6-f91ed292bd04
2018-02-21 12:43:15,741 INFO org.apache.flink.runtime.blob.TransientBlobCache
- Created BLOB cache storage directory
/tmp/blobStore-b489e08d-4731-4751-9034-567ffef2d9a8
2018-02-21 12:43:16,333 INFO org.apache.flink.yarn.YarnTaskManager
- Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)
2018-02-21 12:43:16,334 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from CREATED to DEPLOYING.
2018-02-21 12:43:16,335 INFO org.apache.flink.runtime.taskmanager.Task
- Creating FileSystem stream leak safety net for task Source:
Custom Source -> Map -> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740)
[DEPLOYING]
2018-02-21 12:43:16,347 INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task Source: Custom Source -> Map -> Sink:
Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,382 INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: Source: Custom Source -> Map ->
Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,390 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from DEPLOYING to RUNNING.
2018-02-21 12:43:16,401 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - No state
backend has been configured, using default state backend (Memory / JobManager)
2018-02-21 12:43:18,711 INFO org.apache.hadoop.conf.Configuration.deprecation
- fs.s3a.server-side-encryption-key is deprecated. Instead, use
fs.s3a.server-side-encryption.key
2018-02-21 12:43:19,631 WARN
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The
short-circuit local reads feature cannot be used because libhadoop cannot be
loaded.
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** I think this can be ignored. We do not see errors later on for this file.
***
2018-02-21 12:43:20,000 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 44 key:
user/clsadmin/72566a07-0f62-42b2-84fe-99d2355477f7
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:592)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:745)
2018-02-21 12:43:21,357 INFO
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state to
restore for the BucketingSink (taskIdx=0).
2018-02-21 12:43:21,412 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does
not contain a setter for field topic
2018-02-21 12:43:21,412 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is
not a valid POJO type because not all fields are valid POJO fields.
2018-02-21 12:43:21,422 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
restore state for FlinkKafkaConsumer.
2018-02-21 12:43:21,472 INFO org.apache.kafka.clients.consumer.ConsumerConfig
- ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers =
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
check.crcs = true
client.id =
...
omitted for brevity
...
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-21 12:43:21,528 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
2018-02-21 12:43:21,613 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.11.0.2
2018-02-21 12:43:21,613 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:21,938 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
subtask 0 will start reading the following 1 partitions from the committed
group offsets in Kafka: [KafkaTopicPartition{topic='transactions_load',
partition=0}]
2018-02-21 12:43:21,950 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink:
Unnamed (1/1),5,Flink Task Threads] took 4 ms.
2018-02-21 12:43:21,968 INFO org.apache.kafka.clients.consumer.ConsumerConfig
- ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers =
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
check.crcs = true
client.id =
...
omitted for brevity
...
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-21 12:43:21,974 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
2018-02-21 12:43:21,982 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.11.0.2
2018-02-21 12:43:21,983 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:22,078 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered
coordinator kafka08-prod02.messagehub.services.us-south.bluemix.net:9093 (id:
2147483640 rack: null) for group kafka-flink-iae-streaming-demo.
2018-02-21 12:43:22,273 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink:
Unnamed (1/1),5,Flink Task Threads] took 320 ms.
2018-02-21 12:43:22,316 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 34 ms.
...
repeated code omitted
...
2018-02-21 12:45:20,368 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
2018-02-21 12:45:20,368 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This is the code that first closes the output stream, resulting in the
later flush throwing an exception
***
2018-02-21 12:45:21,426 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:100)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:161)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This stacktrace is printed immediately before the exception
***
2018-02-21 12:45:22,155 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:93)
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
java.io.DataOutputStream.flush(DataOutputStream.java:123)
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
2018-02-21 12:45:22,155 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Source: Custom Source -> Map
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
***
*** vvvv ACTUAL ERROR vvvv ***
***
2018-02-21 12:45:22,157 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed. Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed. Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
at
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:95)
at
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
at
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
at
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
at
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
... 7 more
2018-02-21 12:45:22,172 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Source: Custom Source -> Map
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
2018-02-21 12:45:22,179 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut
down timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
...
{code}
Run with
{code}
${FLINK_HOME}/bin/flink run -m yarn-cluster -yn 1 $YARN_BACKGROUND
/home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar --kafka-brokers
${KAFKA_BROKERS} --kafka-topic ${KAFKA_TOPIC} --kafka-username
${KAFKA_USERNAME} --kafka-password ${KAFKA_PASSWORD} --kafka-group-id
${KAFKA_GROUP_ID} --output-folder s3://${S3_BUCKET}/${S3_FOLDER}
--output-bucket-format-string ${S3_BUCKET_FORMAT_STRING}
{code}
was (Author: snowch):
Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
S3AFileSystem fs,
String key,
Progressable progress)
throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
this.backupStream = new BufferedOutputStream(
new FileOutputStream(backupFile));
}
// ** print extra debug output **
void printStackTrace() {
long threadId = Thread.currentThread().getId();
StringBuilder sb = new StringBuilder();
sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
sb.append("\n " + ste);
}
// I'm being lazy - log the stacktrace as an error so it will get logged
without having to
// change the logger configuration
LOG.error(sb.toString());
}
/**
* Check for the filesystem being open.
* @throws IOException if the filesystem is closed.
*/
void checkOpen() throws IOException {
if (closed.get()) {
printStackTrace();
throw new IOException(
"Output Stream closed. Thread id: " +
Thread.currentThread().getId() + " key: " + key);
}
}
@Override
public void flush() throws IOException {
checkOpen();
backupStream.flush();
}
@Override
public void close() throws IOException {
printStackTrace();
if (closed.getAndSet(true)) {
return;
}
backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
try {
final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
Upload upload = fs.putObject(
fs.newPutObjectRequest(
key,
om,
backupFile));
ProgressableProgressListener listener =
new ProgressableProgressListener(fs, key, upload, progress);
upload.addProgressListener(listener);
upload.waitForUploadResult();
listener.uploadCompleted();
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
} catch (AmazonClientException e) {
throw translateException("saving output", key , e);
} finally {
if (!backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", key);
}
@Override
public void write(int b) throws IOException {
checkOpen();
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkOpen();
backupStream.write(b, off, len);
}
}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO org.apache.flink.yarn.YarnTaskManagerRunner
-
--------------------------------------------------------------------------------
2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2,
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- OS current user: clsadmin
2018-02-21 12:43:12,162 WARN org.apache.hadoop.util.NativeCodeLoader
- Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation -
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Hadoop version: 2.7.3.2.6.2.0-205
...
omitted for brevity
...
2018-02-21 12:43:12,267 INFO org.apache.flink.yarn.YarnTaskManagerRunner
-
--------------------------------------------------------------------------------
2018-02-21 12:43:12,270 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Registered UNIX signal handlers for [TERM, HUP, INT]
2018-02-21 12:43:12,705 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Loading configuration from .
...
omitted for brevity
...
property: taskmanager.memory.preallocate, false
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current working/local Directory:
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Current working Directory:
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- TM: remoteKeytabPath obtained null
2018-02-21 12:43:12,778 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- TM: remoteKeytabPrincipal obtained null
2018-02-21 12:43:12,779 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- Setting directories for temporary file
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,780 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- YARN daemon is running as: clsadmin Yarn client user obtainer:
clsadmin
2018-02-21 12:43:12,781 INFO org.apache.flink.yarn.YarnTaskManagerRunner
- ResourceID assigned for this container:
container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,857 INFO
org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set
to clsadmin (auth:SIMPLE)
2018-02-21 12:43:13,007 INFO
org.apache.flink.runtime.util.LeaderRetrievalUtils - Trying to
select the network interface and address to use by connecting to the leading
JobManager.
2018-02-21 12:43:13,008 INFO
org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager
will try to connect for 10000 milliseconds before falling back to heuristics
2018-02-21 12:43:13,012 INFO org.apache.flink.runtime.net.ConnectionUtils
- Retrieved new target address
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:38012.
2018-02-21 12:43:13,032 INFO org.apache.flink.runtime.taskmanager.TaskManager
- TaskManager will use hostname/address
'chs-van-484-dn001.bi.services.us-south.bluemix.net' (172.16.16.130) for
communication.
2018-02-21 12:43:13,042 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager
2018-02-21 12:43:13,043 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor system at
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907.
2018-02-21 12:43:13,045 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Trying to start actor system at
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907
2018-02-21 12:43:13,977 INFO akka.event.slf4j.Slf4jLogger
- Slf4jLogger started
2018-02-21 12:43:14,145 INFO akka.remote.Remoting
- Starting remoting
2018-02-21 12:43:14,504 INFO akka.remote.Remoting
- Remoting started; listening on addresses
:[akka.tcp://[email protected]:38907]
2018-02-21 12:43:14,522 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Actor system started at
akka.tcp://[email protected]:38907
2018-02-21 12:43:14,539 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics
reporter configured, no metrics will be exposed/reported.
2018-02-21 12:43:14,544 INFO org.apache.flink.runtime.taskmanager.TaskManager
- Starting TaskManager actor
2018-02-21 12:43:14,552 INFO
org.apache.flink.runtime.io.network.netty.NettyConfig - NettyConfig
[server address:
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130, server port:
0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO,
number of server threads: 1 (manual), number of client threads: 1 (manual),
server connect backlog: 0 (use Netty's default), client connect timeout (sec):
120, send/receive buffer size (bytes): 0 (use Netty's default)]
2018-02-21 12:43:14,559 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration - Messages have
a max timeout of 10000 ms
2018-02-21 12:43:14,568 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file
directory
'/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,568 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Temporary file
directory
'/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,649 INFO
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool - Allocated 64 MB
for network buffer pool (number of memory segments: 2048, bytes per segment:
32768).
2018-02-21 12:43:14,878 WARN
org.apache.flink.runtime.query.QueryableStateUtils - Could not load
Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is
not in the classpath. Please put the corresponding jar from the opt to the lib
folder.
2018-02-21 12:43:14,879 WARN
org.apache.flink.runtime.query.QueryableStateUtils - Could not load
Queryable State Server. Probable reason: flink-queryable-state-runtime is not
in the classpath. Please put the corresponding jar from the opt to the lib
folder.
2018-02-21 12:43:14,881 INFO
org.apache.flink.runtime.io.network.NetworkEnvironment - Starting the
network environment and its components.
2018-02-21 12:43:14,965 INFO
org.apache.flink.runtime.io.network.netty.NettyClient - Successful
initialization (took 69 ms).
2018-02-21 12:43:15,049 INFO
org.apache.flink.runtime.io.network.netty.NettyServer - Successful
initialization (took 84 ms). Listening on SocketAddress /172.16.16.130:33982.
2018-02-21 12:43:15,135 INFO
org.apache.flink.runtime.taskexecutor.TaskManagerServices - Limiting
managed memory to 0.7 of the currently free heap space (234 MB), memory will be
allocated lazily.
2018-02-21 12:43:15,148 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-9dbfdfda-909a-45da-aab7-d82d826edf4b
for spill files.
2018-02-21 12:43:15,148 INFO
org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager
uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-d4a1da3f-22a4-4a62-9589-050c2343dc59
for spill files.
2018-02-21 12:43:15,156 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-80d88a48-c391-44ec-b61d-cb293b20fe5a
2018-02-21 12:43:15,156 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-58fa3a64-ae94-44c9-b491-98c5dd89d426
2018-02-21 12:43:15,319 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-e6f21f36-1929-4e9c-a6e6-9225907dc7e4
2018-02-21 12:43:15,320 INFO org.apache.flink.runtime.filecache.FileCache
- User file cache uses directory
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-47bc021d-52c9-4f35-abe1-dd9024f08c8f
2018-02-21 12:43:15,344 INFO org.apache.flink.yarn.YarnTaskManager
- Starting TaskManager actor at
akka://flink/user/taskmanager#-1252244271.
2018-02-21 12:43:15,345 INFO org.apache.flink.yarn.YarnTaskManager
- TaskManager data connection information:
container_e01_1519207944666_0008_01_000002 @
chs-van-484-dn001.bi.services.us-south.bluemix.net (dataPort=33982)
2018-02-21 12:43:15,346 INFO org.apache.flink.yarn.YarnTaskManager
- TaskManager has 1 task slot(s).
2018-02-21 12:43:15,348 INFO org.apache.flink.yarn.YarnTaskManager
- Memory usage stats: [HEAP: 77/406/406 MB, NON HEAP: 35/36/-1 MB
(used/committed/max)]
2018-02-21 12:43:15,363 INFO org.apache.flink.yarn.YarnTaskManager
- Trying to register at JobManager
akka.tcp://[email protected]:38012/user/jobmanager
(attempt 1, timeout: 500 milliseconds)
2018-02-21 12:43:15,711 INFO org.apache.flink.yarn.YarnTaskManager
- Successful registration at JobManager
(akka.tcp://[email protected]:38012/user/jobmanager),
starting network stack and library cache.
2018-02-21 12:43:15,723 INFO org.apache.flink.yarn.YarnTaskManager
- Determined BLOB server address to be
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:41971.
Starting BLOB cache.
2018-02-21 12:43:15,734 INFO org.apache.flink.runtime.blob.PermanentBlobCache
- Created BLOB cache storage directory
/tmp/blobStore-cd4d61de-14b1-454e-9cd6-f91ed292bd04
2018-02-21 12:43:15,741 INFO org.apache.flink.runtime.blob.TransientBlobCache
- Created BLOB cache storage directory
/tmp/blobStore-b489e08d-4731-4751-9034-567ffef2d9a8
2018-02-21 12:43:16,333 INFO org.apache.flink.yarn.YarnTaskManager
- Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)
2018-02-21 12:43:16,334 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from CREATED to DEPLOYING.
2018-02-21 12:43:16,335 INFO org.apache.flink.runtime.taskmanager.Task
- Creating FileSystem stream leak safety net for task Source:
Custom Source -> Map -> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740)
[DEPLOYING]
2018-02-21 12:43:16,347 INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task Source: Custom Source -> Map -> Sink:
Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,382 INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: Source: Custom Source -> Map ->
Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,390 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from DEPLOYING to RUNNING.
2018-02-21 12:43:16,401 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - No state
backend has been configured, using default state backend (Memory / JobManager)
2018-02-21 12:43:18,711 INFO org.apache.hadoop.conf.Configuration.deprecation
- fs.s3a.server-side-encryption-key is deprecated. Instead, use
fs.s3a.server-side-encryption.key
2018-02-21 12:43:19,631 WARN
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The
short-circuit local reads feature cannot be used because libhadoop cannot be
loaded.
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
***
2018-02-21 12:43:20,000 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 44 key:
user/clsadmin/72566a07-0f62-42b2-84fe-99d2355477f7
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:592)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:745)
2018-02-21 12:43:21,357 INFO
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state to
restore for the BucketingSink (taskIdx=0).
2018-02-21 12:43:21,412 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does
not contain a setter for field topic
2018-02-21 12:43:21,412 INFO org.apache.flink.api.java.typeutils.TypeExtractor
- class
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is
not a valid POJO type because not all fields are valid POJO fields.
2018-02-21 12:43:21,422 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - No
restore state for FlinkKafkaConsumer.
2018-02-21 12:43:21,472 INFO org.apache.kafka.clients.consumer.ConsumerConfig
- ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers =
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
check.crcs = true
client.id =
...
omitted for brevity
...
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-21 12:43:21,528 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
2018-02-21 12:43:21,613 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.11.0.2
2018-02-21 12:43:21,613 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:21,938 INFO
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer
subtask 0 will start reading the following 1 partitions from the committed
group offsets in Kafka: [KafkaTopicPartition{topic='transactions_load',
partition=0}]
2018-02-21 12:43:21,950 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink:
Unnamed (1/1),5,Flink Task Threads] took 4 ms.
2018-02-21 12:43:21,968 INFO org.apache.kafka.clients.consumer.ConsumerConfig
- ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers =
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093,
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
check.crcs = true
client.id =
...
omitted for brevity
...
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
2018-02-21 12:43:21,974 INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
2018-02-21 12:43:21,982 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka version : 0.11.0.2
2018-02-21 12:43:21,983 INFO org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:22,078 INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered
coordinator kafka08-prod02.messagehub.services.us-south.bluemix.net:9093 (id:
2147483640 rack: null) for group kafka-flink-iae-streaming-demo.
2018-02-21 12:43:22,273 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink:
Unnamed (1/1),5,Flink Task Threads] took 320 ms.
2018-02-21 12:43:22,316 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 34 ms.
...
repeated code omitted
...
2018-02-21 12:45:20,368 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
2018-02-21 12:45:20,368 INFO
org.apache.flink.runtime.state.DefaultOperatorStateBackend -
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This is the code that first closes the output stream, resulting in the
later flush throwing an exception
***
2018-02-21 12:45:21,426 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:100)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:161)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
***
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This stacktrace is printed immediately before the exception
***
2018-02-21 12:45:22,155 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
java.lang.Thread.getStackTrace(Thread.java:1556)
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:93)
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
java.io.DataOutputStream.flush(DataOutputStream.java:123)
java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
2018-02-21 12:45:22,155 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Source: Custom Source -> Map
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
***
*** vvvv ACTUAL ERROR vvvv ***
***
2018-02-21 12:45:22,157 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source -> Map -> Sink: Unnamed (1/1)
(fddbde91f648fdb9acebf5ac44a32740) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed. Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress}
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed. Thread id: 53 key:
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
at
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:95)
at
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
at
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
at
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
at
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
... 7 more
2018-02-21 12:45:22,172 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Source: Custom Source -> Map
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
2018-02-21 12:45:22,179 ERROR
org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut
down timer service
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
...
{code}
Run with
{code}
${FLINK_HOME}/bin/flink run -m yarn-cluster -yn 1 $YARN_BACKGROUND
/home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar --kafka-brokers
${KAFKA_BROKERS} --kafka-topic ${KAFKA_TOPIC} --kafka-username
${KAFKA_USERNAME} --kafka-password ${KAFKA_PASSWORD} --kafka-group-id
${KAFKA_GROUP_ID} --output-folder s3://${S3_BUCKET}/${S3_FOLDER}
--output-bucket-format-string ${S3_BUCKET_FORMAT_STRING}
{code}
> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.4.0
> Environment: IBM Analytics Engine -
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2
> Jupyter Enterprise Gateway 0.5.0
> HBase 1.1.2 *
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 *
> Tez 0.7.0 *
> Pig 0.16.0 *
> Sqoop 1.4.6 *
> Slider 0.92.0 *
> Reporter: chris snow
> Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer<Tuple2<String,
> Object>>(formatString));
> {code}
>
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!
> The Flink console output is showing an exception being thrown by
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster
> and added some additional logging to the checkOpen() method to log the 'key'
> just before the exception is thrown:
>
> {code:java}
> /*
> * Decompiled with CFR.
> */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key,
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vvvvvv-- Additional logging --vvvvvvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload",
> (Object)this.key);
> try {
> ObjectMetadata om =
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload =
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new
> ProgressableProgressListener(this.fs, this.key, upload, this.progress);
> upload.addProgressListener((ProgressListener)listener);
> upload.waitForUploadResult();
> listener.uploadCompleted();
> this.fs.finishedWrite(this.key);
> }
> catch (InterruptedException e) {
> throw (InterruptedIOException)new
> InterruptedIOException(e.toString()).initCause(e);
> }
> catch (AmazonClientException e) {
> throw S3AUtils.translateException("saving output", this.key, e);
> }
> finally {
> if (!this.backupFile.delete()) {
> LOG.warn("Could not delete temporary s3a file: {}",
> (Object)this.backupFile);
> }
> super.close();
> }
> LOG.debug("OutputStream for key '{}' upload complete",
> (Object)this.key);
> }
> @Override
> public void write(int b) throws IOException {
> this.checkOpen();
> this.backupStream.write(b);
> }
> @Override
> public void write(byte[] b, int off, int len) throws IOException {
> this.checkOpen();
> this.backupStream.write(b, off, len);
> }
> static {
> }
> }
> {code}
>
> You can see from this addition log output that the S3AOutputStream#close()
> method **appears** to be called before the S3AOutputStream#flush() method:
>
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress:
> 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress
> - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0
> bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
> - Finished write to
> landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
> - object_delete_requests += 1 -> 3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
>
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress'
> upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
>
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress'
> closed.
> 2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink:
> Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task
> - Source: Custom Source -> Map -> Sink: Unnamed (1/2)
> (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
> at
> org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
> at
> org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
> at
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
> at
> org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
> at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
> at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
> at
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
> at
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
> ... 7 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)