[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371403#comment-16371403
 ] 

chris snow edited comment on FLINK-8543 at 2/21/18 1:33 PM:
------------------------------------------------------------

Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
      S3AFileSystem fs,
      String key,
      Progressable progress)
      throws IOException {
    this.key = key;
    this.progress = progress;
    this.fs = fs;


    backupFile = fs.createTmpFileForWrite("output-",
        LocalDirAllocator.SIZE_UNKNOWN, conf);

    LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
        key, backupFile);

    this.backupStream = new BufferedOutputStream(
        new FileOutputStream(backupFile));
  }


  // ** print extra debug output **

  void printStackTrace() {
    long threadId = Thread.currentThread().getId();
    StringBuilder sb = new StringBuilder();
    sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
    for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
      sb.append("\n     " + ste);
    }
    // I'm being lazy - log the stacktrace as an error so it will get logged 
without having to 
    // change the logger configuration 
    LOG.error(sb.toString());
  }

  /**
   * Check for the filesystem being open.
   * @throws IOException if the filesystem is closed.
   */
  void checkOpen() throws IOException {
    if (closed.get()) {
      printStackTrace();
      throw new IOException(
              "Output Stream closed.  Thread id: " + 
Thread.currentThread().getId() + " key: " + key);
    }
  }

  @Override
  public void flush() throws IOException {
    checkOpen();
    backupStream.flush();
  }

  @Override
  public void close() throws IOException {

    printStackTrace();

    if (closed.getAndSet(true)) {
      return;
    }

    backupStream.close();
    LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);

    try {
      final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
      Upload upload = fs.putObject(
          fs.newPutObjectRequest(
              key,
              om,
              backupFile));
      ProgressableProgressListener listener =
          new ProgressableProgressListener(fs, key, upload, progress);
      upload.addProgressListener(listener);

      upload.waitForUploadResult();
      listener.uploadCompleted();
      // This will delete unnecessary fake parent directories
      fs.finishedWrite(key);
    } catch (InterruptedException e) {
      throw (InterruptedIOException) new InterruptedIOException(e.toString())
          .initCause(e);
    } catch (AmazonClientException e) {
      throw translateException("saving output", key , e);
    } finally {
      if (!backupFile.delete()) {
        LOG.warn("Could not delete temporary s3a file: {}", backupFile);
      }
      super.close();
    }
    LOG.debug("OutputStream for key '{}' upload complete", key);
  }

  @Override
  public void write(int b) throws IOException {
    checkOpen();
    backupStream.write(b);
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    checkOpen();
    backupStream.write(b, off, len);
  }

}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - 
--------------------------------------------------------------------------------
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2, 
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  OS current user: clsadmin
2018-02-21 12:43:12,162 WARN  org.apache.hadoop.util.NativeCodeLoader           
            - Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Hadoop version: 2.7.3.2.6.2.0-205
        ... 
        omitted for brevity
        ... 
2018-02-21 12:43:12,267 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - 
--------------------------------------------------------------------------------
2018-02-21 12:43:12,270 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-02-21 12:43:12,705 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Loading configuration from .
        ... 
        omitted for brevity
        ... 
property: taskmanager.memory.preallocate, false
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Current working/local Directory: 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Current working Directory: 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - TM: remoteKeytabPath obtained null
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - TM: remoteKeytabPrincipal obtained null
2018-02-21 12:43:12,779 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Setting directories for temporary file 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,780 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - YARN daemon is running as: clsadmin Yarn client user obtainer: 
clsadmin
2018-02-21 12:43:12,781 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - ResourceID assigned for this container: 
container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,857 INFO  
org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set 
to clsadmin (auth:SIMPLE)
2018-02-21 12:43:13,007 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
2018-02-21 12:43:13,008 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager 
will try to connect for 10000 milliseconds before falling back to heuristics
2018-02-21 12:43:13,012 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Retrieved new target address 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:38012.
2018-02-21 12:43:13,032 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - TaskManager will use hostname/address 
'chs-van-484-dn001.bi.services.us-south.bluemix.net' (172.16.16.130) for 
communication.
2018-02-21 12:43:13,042 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager
2018-02-21 12:43:13,043 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager actor system at 
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907.
2018-02-21 12:43:13,045 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Trying to start actor system at 
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907
2018-02-21 12:43:13,977 INFO  akka.event.slf4j.Slf4jLogger                      
            - Slf4jLogger started
2018-02-21 12:43:14,145 INFO  akka.remote.Remoting                              
            - Starting remoting
2018-02-21 12:43:14,504 INFO  akka.remote.Remoting                              
            - Remoting started; listening on addresses 
:[akka.tcp://[email protected]:38907]
2018-02-21 12:43:14,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Actor system started at 
akka.tcp://[email protected]:38907
2018-02-21 12:43:14,539 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics 
reporter configured, no metrics will be exposed/reported.
2018-02-21 12:43:14,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager actor
2018-02-21 12:43:14,552 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig 
[server address: 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130, server port: 
0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO, 
number of server threads: 1 (manual), number of client threads: 1 (manual), 
server connect backlog: 0 (use Netty's default), client connect timeout (sec): 
120, send/receive buffer size (bytes): 0 (use Netty's default)]
2018-02-21 12:43:14,559 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 10000 ms
2018-02-21 12:43:14,568 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file 
directory 
'/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
 total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,568 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file 
directory 
'/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
 total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,649 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB 
for network buffer pool (number of memory segments: 2048, bytes per segment: 
32768).
2018-02-21 12:43:14,878 WARN  
org.apache.flink.runtime.query.QueryableStateUtils            - Could not load 
Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is 
not in the classpath. Please put the corresponding jar from the opt to the lib 
folder.
2018-02-21 12:43:14,879 WARN  
org.apache.flink.runtime.query.QueryableStateUtils            - Could not load 
Queryable State Server. Probable reason: flink-queryable-state-runtime is not 
in the classpath. Please put the corresponding jar from the opt to the lib 
folder.
2018-02-21 12:43:14,881 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the 
network environment and its components.
2018-02-21 12:43:14,965 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient         - Successful 
initialization (took 69 ms).
2018-02-21 12:43:15,049 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
initialization (took 84 ms). Listening on SocketAddress /172.16.16.130:33982.
2018-02-21 12:43:15,135 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting 
managed memory to 0.7 of the currently free heap space (234 MB), memory will be 
allocated lazily.
2018-02-21 12:43:15,148 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-9dbfdfda-909a-45da-aab7-d82d826edf4b
 for spill files.
2018-02-21 12:43:15,148 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-d4a1da3f-22a4-4a62-9589-050c2343dc59
 for spill files.
2018-02-21 12:43:15,156 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-80d88a48-c391-44ec-b61d-cb293b20fe5a
2018-02-21 12:43:15,156 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-58fa3a64-ae94-44c9-b491-98c5dd89d426
2018-02-21 12:43:15,319 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-e6f21f36-1929-4e9c-a6e6-9225907dc7e4
2018-02-21 12:43:15,320 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-47bc021d-52c9-4f35-abe1-dd9024f08c8f
2018-02-21 12:43:15,344 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Starting TaskManager actor at 
akka://flink/user/taskmanager#-1252244271.
2018-02-21 12:43:15,345 INFO  org.apache.flink.yarn.YarnTaskManager             
            - TaskManager data connection information: 
container_e01_1519207944666_0008_01_000002 @ 
chs-van-484-dn001.bi.services.us-south.bluemix.net (dataPort=33982)
2018-02-21 12:43:15,346 INFO  org.apache.flink.yarn.YarnTaskManager             
            - TaskManager has 1 task slot(s).
2018-02-21 12:43:15,348 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Memory usage stats: [HEAP: 77/406/406 MB, NON HEAP: 35/36/-1 MB 
(used/committed/max)]
2018-02-21 12:43:15,363 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Trying to register at JobManager 
akka.tcp://[email protected]:38012/user/jobmanager
 (attempt 1, timeout: 500 milliseconds)
2018-02-21 12:43:15,711 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Successful registration at JobManager 
(akka.tcp://[email protected]:38012/user/jobmanager),
 starting network stack and library cache.
2018-02-21 12:43:15,723 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Determined BLOB server address to be 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:41971. 
Starting BLOB cache.
2018-02-21 12:43:15,734 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
            - Created BLOB cache storage directory 
/tmp/blobStore-cd4d61de-14b1-454e-9cd6-f91ed292bd04
2018-02-21 12:43:15,741 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
            - Created BLOB cache storage directory 
/tmp/blobStore-b489e08d-4731-4751-9034-567ffef2d9a8
2018-02-21 12:43:16,333 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)
2018-02-21 12:43:16,334 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from CREATED to DEPLOYING.
2018-02-21 12:43:16,335 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Creating FileSystem stream leak safety net for task Source: 
Custom Source -> Map -> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) 
[DEPLOYING]
2018-02-21 12:43:16,347 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Loading JAR files for task Source: Custom Source -> Map -> Sink: 
Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,382 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Registering task at network: Source: Custom Source -> Map -> 
Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,390 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from DEPLOYING to RUNNING.
2018-02-21 12:43:16,401 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - No state 
backend has been configured, using default state backend (Memory / JobManager)
2018-02-21 12:43:18,711 INFO  org.apache.hadoop.conf.Configuration.deprecation  
            - fs.s3a.server-side-encryption-key is deprecated. Instead, use 
fs.s3a.server-side-encryption.key
2018-02-21 12:43:19,631 WARN  
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** I think this can be ignored.  We do not see errors later on for this file.
*** 

2018-02-21 12:43:20,000 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 44 key: 
user/clsadmin/72566a07-0f62-42b2-84fe-99d2355477f7
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
     
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
     org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:592)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
     
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
     
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
     
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
     
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
     org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
     java.lang.Thread.run(Thread.java:745)
2018-02-21 12:43:21,357 INFO  
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to 
restore for the BucketingSink (taskIdx=0).
2018-02-21 12:43:21,412 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
            - class 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does 
not contain a setter for field topic
2018-02-21 12:43:21,412 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
            - class 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is 
not a valid POJO type because not all fields are valid POJO fields.
2018-02-21 12:43:21,422 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No 
restore state for FlinkKafkaConsumer.
2018-02-21 12:43:21,472 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  
            - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = 
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
        check.crcs = true
        client.id =
        ... 
        omitted for brevity
        ... 
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-21 12:43:21,528 INFO  
org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
logged in.
2018-02-21 12:43:21,613 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka version : 0.11.0.2
2018-02-21 12:43:21,613 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:21,938 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 0 will start reading the following 1 partitions from the committed 
group offsets in Kafka: [KafkaTopicPartition{topic='transactions_load', 
partition=0}]
2018-02-21 12:43:21,950 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
Unnamed (1/1),5,Flink Task Threads] took 4 ms.
2018-02-21 12:43:21,968 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  
            - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = 
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
        check.crcs = true
        client.id =
        ... 
        omitted for brevity
        ... 
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-21 12:43:21,974 INFO  
org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
logged in.
2018-02-21 12:43:21,982 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka version : 0.11.0.2
2018-02-21 12:43:21,983 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:22,078 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka08-prod02.messagehub.services.us-south.bluemix.net:9093 (id: 
2147483640 rack: null) for group kafka-flink-iae-streaming-demo.
2018-02-21 12:43:22,273 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
Unnamed (1/1),5,Flink Task Threads] took 320 ms.
2018-02-21 12:43:22,316 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 34 ms.
...
repeated code omitted
...
2018-02-21 12:45:20,368 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
2018-02-21 12:45:20,368 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This is the code that first closes the output stream, resulting in the 
later flush throwing an exception
***

2018-02-21 12:45:21,426 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
     
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
     org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
     
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:100)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:161)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
     
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     java.util.concurrent.FutureTask.run(FutureTask.java:266)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     java.lang.Thread.run(Thread.java:745)

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This stacktrace is printed immediately before the exception
***

2018-02-21 12:45:22,155 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:93)
     org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
     java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
     java.io.DataOutputStream.flush(DataOutputStream.java:123)
     java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
     java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
     
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
     
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
     org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
     org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
     
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     java.util.concurrent.FutureTask.run(FutureTask.java:266)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     java.lang.Thread.run(Thread.java:745)
2018-02-21 12:45:22,155 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to fail task externally Source: Custom Source -> Map 
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).

*** 
*** vvvv ACTUAL ERROR vvvv ***
***

2018-02-21 12:45:22,157 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed.  Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed.  Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:95)
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
        at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
        at 
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
        ... 7 more
2018-02-21 12:45:22,172 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code Source: Custom Source -> Map 
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
2018-02-21 12:45:22,179 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut 
down timer service
java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
        at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
...
{code}

Run with 

{code}
${FLINK_HOME}/bin/flink run -m yarn-cluster -yn 1 $YARN_BACKGROUND 
/home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar   --kafka-brokers 
${KAFKA_BROKERS}   --kafka-topic ${KAFKA_TOPIC}   --kafka-username 
${KAFKA_USERNAME}   --kafka-password ${KAFKA_PASSWORD}   --kafka-group-id 
${KAFKA_GROUP_ID}   --output-folder s3://${S3_BUCKET}/${S3_FOLDER}   
--output-bucket-format-string ${S3_BUCKET_FORMAT_STRING}
{code}


was (Author: snowch):
Sorry for the delay. I've added the debug statements:
{code:java}
public S3AOutputStream(Configuration conf,
      S3AFileSystem fs,
      String key,
      Progressable progress)
      throws IOException {
    this.key = key;
    this.progress = progress;
    this.fs = fs;


    backupFile = fs.createTmpFileForWrite("output-",
        LocalDirAllocator.SIZE_UNKNOWN, conf);

    LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
        key, backupFile);

    this.backupStream = new BufferedOutputStream(
        new FileOutputStream(backupFile));
  }


  // ** print extra debug output **

  void printStackTrace() {
    long threadId = Thread.currentThread().getId();
    StringBuilder sb = new StringBuilder();
    sb.append("Thread id: " + Thread.currentThread().getId() + " key: " + key);
    for (StackTraceElement ste : Thread.currentThread().getStackTrace()) {
      sb.append("\n     " + ste);
    }
    // I'm being lazy - log the stacktrace as an error so it will get logged 
without having to 
    // change the logger configuration 
    LOG.error(sb.toString());
  }

  /**
   * Check for the filesystem being open.
   * @throws IOException if the filesystem is closed.
   */
  void checkOpen() throws IOException {
    if (closed.get()) {
      printStackTrace();
      throw new IOException(
              "Output Stream closed.  Thread id: " + 
Thread.currentThread().getId() + " key: " + key);
    }
  }

  @Override
  public void flush() throws IOException {
    checkOpen();
    backupStream.flush();
  }

  @Override
  public void close() throws IOException {

    printStackTrace();

    if (closed.getAndSet(true)) {
      return;
    }

    backupStream.close();
    LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);

    try {
      final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
      Upload upload = fs.putObject(
          fs.newPutObjectRequest(
              key,
              om,
              backupFile));
      ProgressableProgressListener listener =
          new ProgressableProgressListener(fs, key, upload, progress);
      upload.addProgressListener(listener);

      upload.waitForUploadResult();
      listener.uploadCompleted();
      // This will delete unnecessary fake parent directories
      fs.finishedWrite(key);
    } catch (InterruptedException e) {
      throw (InterruptedIOException) new InterruptedIOException(e.toString())
          .initCause(e);
    } catch (AmazonClientException e) {
      throw translateException("saving output", key , e);
    } finally {
      if (!backupFile.delete()) {
        LOG.warn("Could not delete temporary s3a file: {}", backupFile);
      }
      super.close();
    }
    LOG.debug("OutputStream for key '{}' upload complete", key);
  }

  @Override
  public void write(int b) throws IOException {
    checkOpen();
    backupStream.write(b);
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    checkOpen();
    backupStream.write(b, off, len);
  }

}
{code}
And here is one of the yarn logs:
{code:java}
Log Contents:
2018-02-21 12:43:11,323 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - 
--------------------------------------------------------------------------------
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Starting YARN TaskManager (Version: 1.4.0, Rev:3a9d9f2, 
Date:06.12.2017 @ 11:08:40 UTC)
2018-02-21 12:43:11,326 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  OS current user: clsadmin
2018-02-21 12:43:12,162 WARN  org.apache.hadoop.util.NativeCodeLoader           
            - Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Current Hadoop/Kerberos user: clsadmin
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.112-b15
2018-02-21 12:43:12,263 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Maximum heap size: 406 MiBytes
2018-02-21 12:43:12,264 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  JAVA_HOME: /usr/jdk64/jdk1.8.0_112
2018-02-21 12:43:12,266 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            -  Hadoop version: 2.7.3.2.6.2.0-205
        ... 
        omitted for brevity
        ... 
2018-02-21 12:43:12,267 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - 
--------------------------------------------------------------------------------
2018-02-21 12:43:12,270 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-02-21 12:43:12,705 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Loading configuration from .
        ... 
        omitted for brevity
        ... 
property: taskmanager.memory.preallocate, false
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Current working/local Directory: 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Current working Directory: 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - TM: remoteKeytabPath obtained null
2018-02-21 12:43:12,778 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - TM: remoteKeytabPrincipal obtained null
2018-02-21 12:43:12,779 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - Setting directories for temporary file 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008,/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008
2018-02-21 12:43:12,780 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - YARN daemon is running as: clsadmin Yarn client user obtainer: 
clsadmin
2018-02-21 12:43:12,781 INFO  org.apache.flink.yarn.YarnTaskManagerRunner       
            - ResourceID assigned for this container: 
container_e01_1519207944666_0008_01_000002
2018-02-21 12:43:12,857 INFO  
org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set 
to clsadmin (auth:SIMPLE)
2018-02-21 12:43:13,007 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to 
select the network interface and address to use by connecting to the leading 
JobManager.
2018-02-21 12:43:13,008 INFO  
org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager 
will try to connect for 10000 milliseconds before falling back to heuristics
2018-02-21 12:43:13,012 INFO  org.apache.flink.runtime.net.ConnectionUtils      
            - Retrieved new target address 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:38012.
2018-02-21 12:43:13,032 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - TaskManager will use hostname/address 
'chs-van-484-dn001.bi.services.us-south.bluemix.net' (172.16.16.130) for 
communication.
2018-02-21 12:43:13,042 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager
2018-02-21 12:43:13,043 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager actor system at 
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907.
2018-02-21 12:43:13,045 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Trying to start actor system at 
chs-van-484-dn001.bi.services.us-south.bluemix.net:38907
2018-02-21 12:43:13,977 INFO  akka.event.slf4j.Slf4jLogger                      
            - Slf4jLogger started
2018-02-21 12:43:14,145 INFO  akka.remote.Remoting                              
            - Starting remoting
2018-02-21 12:43:14,504 INFO  akka.remote.Remoting                              
            - Remoting started; listening on addresses 
:[akka.tcp://[email protected]:38907]
2018-02-21 12:43:14,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Actor system started at 
akka.tcp://[email protected]:38907
2018-02-21 12:43:14,539 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics 
reporter configured, no metrics will be exposed/reported.
2018-02-21 12:43:14,544 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
            - Starting TaskManager actor
2018-02-21 12:43:14,552 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig 
[server address: 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130, server port: 
0, ssl enabled: false, memory segment size (bytes): 32768, transport type: NIO, 
number of server threads: 1 (manual), number of client threads: 1 (manual), 
server connect backlog: 0 (use Netty's default), client connect timeout (sec): 
120, send/receive buffer size (bytes): 0 (use Netty's default)]
2018-02-21 12:43:14,559 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have 
a max timeout of 10000 ms
2018-02-21 12:43:14,568 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file 
directory 
'/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
 total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,568 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file 
directory 
'/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008':
 total 299 GB, usable 298 GB (99.67% usable)
2018-02-21 12:43:14,649 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB 
for network buffer pool (number of memory segments: 2048, bytes per segment: 
32768).
2018-02-21 12:43:14,878 WARN  
org.apache.flink.runtime.query.QueryableStateUtils            - Could not load 
Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is 
not in the classpath. Please put the corresponding jar from the opt to the lib 
folder.
2018-02-21 12:43:14,879 WARN  
org.apache.flink.runtime.query.QueryableStateUtils            - Could not load 
Queryable State Server. Probable reason: flink-queryable-state-runtime is not 
in the classpath. Please put the corresponding jar from the opt to the lib 
folder.
2018-02-21 12:43:14,881 INFO  
org.apache.flink.runtime.io.network.NetworkEnvironment        - Starting the 
network environment and its components.
2018-02-21 12:43:14,965 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient         - Successful 
initialization (took 69 ms).
2018-02-21 12:43:15,049 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
initialization (took 84 ms). Listening on SocketAddress /172.16.16.130:33982.
2018-02-21 12:43:15,135 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Limiting 
managed memory to 0.7 of the currently free heap space (234 MB), memory will be 
allocated lazily.
2018-02-21 12:43:15,148 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-9dbfdfda-909a-45da-aab7-d82d826edf4b
 for spill files.
2018-02-21 12:43:15,148 INFO  
org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager 
uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-io-d4a1da3f-22a4-4a62-9589-050c2343dc59
 for spill files.
2018-02-21 12:43:15,156 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-80d88a48-c391-44ec-b61d-cb293b20fe5a
2018-02-21 12:43:15,156 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-58fa3a64-ae94-44c9-b491-98c5dd89d426
2018-02-21 12:43:15,319 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk1/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-e6f21f36-1929-4e9c-a6e6-9225907dc7e4
2018-02-21 12:43:15,320 INFO  org.apache.flink.runtime.filecache.FileCache      
            - User file cache uses directory 
/disk2/hadoop-swap/yarn/local/usercache/clsadmin/appcache/application_1519207944666_0008/flink-dist-cache-47bc021d-52c9-4f35-abe1-dd9024f08c8f
2018-02-21 12:43:15,344 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Starting TaskManager actor at 
akka://flink/user/taskmanager#-1252244271.
2018-02-21 12:43:15,345 INFO  org.apache.flink.yarn.YarnTaskManager             
            - TaskManager data connection information: 
container_e01_1519207944666_0008_01_000002 @ 
chs-van-484-dn001.bi.services.us-south.bluemix.net (dataPort=33982)
2018-02-21 12:43:15,346 INFO  org.apache.flink.yarn.YarnTaskManager             
            - TaskManager has 1 task slot(s).
2018-02-21 12:43:15,348 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Memory usage stats: [HEAP: 77/406/406 MB, NON HEAP: 35/36/-1 MB 
(used/committed/max)]
2018-02-21 12:43:15,363 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Trying to register at JobManager 
akka.tcp://[email protected]:38012/user/jobmanager
 (attempt 1, timeout: 500 milliseconds)
2018-02-21 12:43:15,711 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Successful registration at JobManager 
(akka.tcp://[email protected]:38012/user/jobmanager),
 starting network stack and library cache.
2018-02-21 12:43:15,723 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Determined BLOB server address to be 
chs-van-484-dn001.bi.services.us-south.bluemix.net/172.16.16.130:41971. 
Starting BLOB cache.
2018-02-21 12:43:15,734 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
            - Created BLOB cache storage directory 
/tmp/blobStore-cd4d61de-14b1-454e-9cd6-f91ed292bd04
2018-02-21 12:43:15,741 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
            - Created BLOB cache storage directory 
/tmp/blobStore-b489e08d-4731-4751-9034-567ffef2d9a8
2018-02-21 12:43:16,333 INFO  org.apache.flink.yarn.YarnTaskManager             
            - Received task Source: Custom Source -> Map -> Sink: Unnamed (1/1)
2018-02-21 12:43:16,334 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from CREATED to DEPLOYING.
2018-02-21 12:43:16,335 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Creating FileSystem stream leak safety net for task Source: 
Custom Source -> Map -> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) 
[DEPLOYING]
2018-02-21 12:43:16,347 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Loading JAR files for task Source: Custom Source -> Map -> Sink: 
Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,382 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Registering task at network: Source: Custom Source -> Map -> 
Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740) [DEPLOYING].
2018-02-21 12:43:16,390 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from DEPLOYING to RUNNING.
2018-02-21 12:43:16,401 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - No state 
backend has been configured, using default state backend (Memory / JobManager)
2018-02-21 12:43:18,711 INFO  org.apache.hadoop.conf.Configuration.deprecation  
            - fs.s3a.server-side-encryption-key is deprecated. Instead, use 
fs.s3a.server-side-encryption.key
2018-02-21 12:43:19,631 WARN  
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** 

2018-02-21 12:43:20,000 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 44 key: 
user/clsadmin/72566a07-0f62-42b2-84fe-99d2355477f7
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
     
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
     org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.reflectTruncate(BucketingSink.java:592)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:362)
     
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
     
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
     
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
     
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
     
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
     org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
     java.lang.Thread.run(Thread.java:745)
2018-02-21 12:43:21,357 INFO  
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to 
restore for the BucketingSink (taskIdx=0).
2018-02-21 12:43:21,412 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
            - class 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does 
not contain a setter for field topic
2018-02-21 12:43:21,412 INFO  org.apache.flink.api.java.typeutils.TypeExtractor 
            - class 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is 
not a valid POJO type because not all fields are valid POJO fields.
2018-02-21 12:43:21,422 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - No 
restore state for FlinkKafkaConsumer.
2018-02-21 12:43:21,472 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  
            - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = 
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
        check.crcs = true
        client.id =
        ... 
        omitted for brevity
        ... 
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-21 12:43:21,528 INFO  
org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
logged in.
2018-02-21 12:43:21,613 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka version : 0.11.0.2
2018-02-21 12:43:21,613 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:21,938 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 0 will start reading the following 1 partitions from the committed 
group offsets in Kafka: [KafkaTopicPartition{topic='transactions_load', 
partition=0}]
2018-02-21 12:43:21,950 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
Unnamed (1/1),5,Flink Task Threads] took 4 ms.
2018-02-21 12:43:21,968 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  
            - ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = 
[kafka02-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka04-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka03-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka01-prod02.messagehub.services.us-south.bluemix.net:9093, 
kafka05-prod02.messagehub.services.us-south.bluemix.net:9093]
        check.crcs = true
        client.id =
        ... 
        omitted for brevity
        ... 
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

2018-02-21 12:43:21,974 INFO  
org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
logged in.
2018-02-21 12:43:21,982 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka version : 0.11.0.2
2018-02-21 12:43:21,983 INFO  org.apache.kafka.common.utils.AppInfoParser       
            - Kafka commitId : 73be1e1168f91ee2
2018-02-21 12:43:22,078 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka08-prod02.messagehub.services.us-south.bluemix.net:9093 (id: 
2147483640 rack: null) for group kafka-flink-iae-streaming-demo.
2018-02-21 12:43:22,273 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous 
part) in thread Thread[Async calls on Source: Custom Source -> Map -> Sink: 
Unnamed (1/1),5,Flink Task Threads] took 320 ms.
2018-02-21 12:43:22,316 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 34 ms.
...
repeated code omitted
...
2018-02-21 12:45:20,368 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.
2018-02-21 12:45:20,368 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous 
part) in thread Thread[pool-4-thread-1,5,Flink Task Threads] took 0 ms.

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This is the code that first closes the output stream, resulting in the 
later flush throwing an exception
***

2018-02-21 12:45:21,426 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:108)
     
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
     org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
     
org.apache.flink.streaming.connectors.fs.StreamWriterBase.close(StreamWriterBase.java:100)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:161)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
     
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     java.util.concurrent.FutureTask.run(FutureTask.java:266)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     java.lang.Thread.run(Thread.java:745)

*** 
*** vvvv NOT AN ERROR - JUST LOGGED AS ERROR vvvv ***
*** This stacktrace is printed immediately before the exception
***

2018-02-21 12:45:22,155 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem            
            - Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
     java.lang.Thread.getStackTrace(Thread.java:1556)
     
org.apache.hadoop.fs.s3a.S3AOutputStream.printStackTrace(S3AOutputStream.java:81)
     org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:93)
     org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
     java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
     java.io.DataOutputStream.flush(DataOutputStream.java:123)
     java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
     java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
     
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
     
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
     org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
     org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
     
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
     
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
     
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     java.util.concurrent.FutureTask.run(FutureTask.java:266)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
     
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
     
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     java.lang.Thread.run(Thread.java:745)
2018-02-21 12:45:22,155 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Attempting to fail task externally Source: Custom Source -> Map 
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).

*** 
*** vvvv ACTUAL ERROR vvvv ***
***

2018-02-21 12:45:22,157 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source -> Map -> Sink: Unnamed (1/1) 
(fddbde91f648fdb9acebf5ac44a32740) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed.  Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress}
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed.  Thread id: 53 key: 
transactions_load_20180221/2018-02-21--1243/_part-0-0.in-progress
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:95)
        at 
org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:101)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
        at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
        at 
org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
        at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
        at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
        at 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
        ... 7 more
2018-02-21 12:45:22,172 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Triggering cancellation of task code Source: Custom Source -> Map 
-> Sink: Unnamed (1/1) (fddbde91f648fdb9acebf5ac44a32740).
2018-02-21 12:45:22,179 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not shut 
down timer service
java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
        at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
...
{code}

Run with 

{code}
${FLINK_HOME}/bin/flink run -m yarn-cluster -yn 1 $YARN_BACKGROUND 
/home/clsadmin/messagehub-to-s3-1.0-SNAPSHOT.jar   --kafka-brokers 
${KAFKA_BROKERS}   --kafka-topic ${KAFKA_TOPIC}   --kafka-username 
${KAFKA_USERNAME}   --kafka-password ${KAFKA_PASSWORD}   --kafka-group-id 
${KAFKA_GROUP_ID}   --output-folder s3://${S3_BUCKET}/${S3_FOLDER}   
--output-bucket-format-string ${S3_BUCKET_FORMAT_STRING}
{code}

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
>                 Key: FLINK-8543
>                 URL: https://issues.apache.org/jira/browse/FLINK-8543
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.4.0
>         Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>            Reporter: chris snow
>            Priority: Blocker
>             Fix For: 1.5.0
>
>         Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
>          .setWriter(writer)
>          .setBucketer(new DateTimeBucketer<Tuple2<String, 
> Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
>     private final OutputStream backupStream;
>     private final File backupFile;
>     private final AtomicBoolean closed = new AtomicBoolean(false);
>     private final String key;
>     private final Progressable progress;
>     private final S3AFileSystem fs;
>     public static final Logger LOG = S3AFileSystem.LOG;
>     public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
>         this.key = key;
>         this.progress = progress;
>         this.fs = fs;
>         this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
>         LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
>         this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
>     }
>     void checkOpen() throws IOException {
>         if (!this.closed.get()) return;
>         // vvvvvv-- Additional logging --vvvvvvv
>         LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
>         throw new IOException("Output Stream closed");
>     }
>     @Override
>     public void flush() throws IOException {
>         this.checkOpen();
>         this.backupStream.flush();
>     }
>     @Override
>     public void close() throws IOException {
>         if (this.closed.getAndSet(true)) {
>             return;
>         }
>         this.backupStream.close();
>         LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
>         try {
>             ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
>             Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
>             ProgressableProgressListener listener = new 
> ProgressableProgressListener(this.fs, this.key, upload, this.progress);
>             upload.addProgressListener((ProgressListener)listener);
>             upload.waitForUploadResult();
>             listener.uploadCompleted();
>             this.fs.finishedWrite(this.key);
>         }
>         catch (InterruptedException e) {
>             throw (InterruptedIOException)new 
> InterruptedIOException(e.toString()).initCause(e);
>         }
>         catch (AmazonClientException e) {
>             throw S3AUtils.translateException("saving output", this.key, e);
>         }
>         finally {
>             if (!this.backupFile.delete()) {
>                 LOG.warn("Could not delete temporary s3a file: {}", 
> (Object)this.backupFile);
>             }
>             super.close();
>         }
>         LOG.debug("OutputStream for key '{}' upload complete", 
> (Object)this.key);
>     }
>     @Override
>     public void write(int b) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b);
>     }
>     @Override
>     public void write(byte[] b, int off, int len) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b, off, len);
>     }
>     static {
>     }
> }
> {code}
>  
> You can see from this addition log output that the S3AOutputStream#close() 
> method **appears** to be called before the S3AOutputStream#flush() method:
>  
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 
> 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress 
>               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 
> bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - Finished write to 
> landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               - object_delete_requests += 1  ->  3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem          
>               
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' 
> closed.
> 2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink: 
> Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Map -> Sink: Unnamed (1/2) 
> (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
>       at 
> org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.DataOutputStream.flush(DataOutputStream.java:123)
>       at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
>       at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
>       at 
> org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
>       at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
>       at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
>       at 
> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
>       at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
>       at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
>       ... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to