[ 
https://issues.apache.org/jira/browse/FLINK-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963327#comment-14963327
 ] 

Maximilian Michels commented on FLINK-2865:
-------------------------------------------

Thanks for reporting. The maximum direct memory is currently set to 
{{taskmanager.heap.mb}}. The number of network buffers are not taken into 
account. This may cause problems in off-heap mode with a large fraction of 
managed memory (0.9 in your case) and a large number of network buffers.

Previously, we had a version that calculated the max direct memory size by 
taking the network buffers into account. However, that had proven to be 
difficult because the network memory consumption dependents on the Netty 
library and the number of network channels. I think the maximum number of 
network memory can never exceed 2 * (network memory). In this case all network 
buffers would be inside the Netty buffer pool. I am, however, not very familiar 
with the Netty library and how it allocates memory.

In the meantime, I'd propose to set the maximum direct memory to 
{{Long.MAX_VALUE}}. This will get rid of the OutOfMemoryErrors. Of course, we 
will only allocate as much memory as needed.

> OutOfMemory error (Direct buffer memory)
> ----------------------------------------
>
>                 Key: FLINK-2865
>                 URL: https://issues.apache.org/jira/browse/FLINK-2865
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>    Affects Versions: 0.10
>            Reporter: Greg Hogan
>
> I see the following TaskManager error when using off-heap memory and a 
> relatively high number of network buffers. Setting 
> {{taskmanager.memory.off-heap: false}} or halving the number of network 
> buffers (6 GB instead of 12 GB) results in a successful start.
> {noformat}
> 18:17:25,912 WARN  org.apache.hadoop.util.NativeCodeLoader                    
>    - Unable to load native-hadoop library for your platform... using 
> builtin-java classes where applicable
> 18:17:26,024 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - 
> --------------------------------------------------------------------------------
> 18:17:26,024 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  Starting TaskManager (Version: 0.10-SNAPSHOT, Rev:d047ddb, 
> Date:18.10.2015 @ 08:54:59 UTC)
> 18:17:26,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  Current user: ec2-user
> 18:17:26,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.60-b23
> 18:17:26,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  Maximum heap size: 5104 MiBytes
> 18:17:26,025 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  JAVA_HOME: /usr/java/latest
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  Hadoop version: 2.3.0
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  JVM Options:
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     -Xms5325M
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     -Xmx5325M
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     -XX:MaxDirectMemorySize=53248M
> 18:17:26,026 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     
> -Dlog.file=/home/ec2-user/flink/log/flink-ec2-user-taskmanager-0-ip-10-0-98-3.log
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     -Dlog4j.configuration=file:/home/ec2-user/flink/conf/log4j.properties
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     
> -Dlogback.configurationFile=file:/home/ec2-user/flink/conf/logback.xml
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -  Program Arguments:
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     --configDir
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     /home/ec2-user/flink/conf
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     --streamingMode
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    -     batch
> 18:17:26,027 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - 
> --------------------------------------------------------------------------------
> 18:17:26,033 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Maximum number of open file descriptors is 1048576
> 18:17:26,051 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Loading configuration from /home/ec2-user/flink/conf
> 18:17:26,079 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Security is not enabled. Starting non-authenticated TaskManager.
> 18:17:26,094 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils         
>    - Trying to select the network interface and address to use by connecting 
> to the leading JobManager.
> 18:17:26,094 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils         
>    - TaskManager will try to connect for 10000 milliseconds before falling 
> back to heuristics
> 18:17:26,097 INFO  org.apache.flink.runtime.net.ConnectionUtils               
>    - Retrieved new target address /127.0.0.1:6123.
> 18:17:26,461 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - TaskManager will use hostname/address 'ip-10-0-98-3' (10.0.98.3) for 
> communication.
> 18:17:26,462 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Starting TaskManager in streaming mode BATCH_ONLY
> 18:17:26,462 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Starting TaskManager actor system at 10.0.98.3:0
> 18:17:26,735 INFO  akka.event.slf4j.Slf4jLogger                               
>    - Slf4jLogger started
> 18:17:26,767 INFO  Remoting                                                   
>    - Starting remoting
> 18:17:26,877 INFO  Remoting                                                   
>    - Remoting started; listening on addresses 
> :[akka.tcp://flink@10.0.98.3:47484]
> 18:17:26,881 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Starting TaskManager actor
> 18:17:26,925 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig      
>    - NettyConfig [server address: ip-10-0-98-3/10.0.98.3, server port: 45728, 
> memory segment size (bytes): 32768, transport type: NIO, number of server 
> threads: 0 (use Netty's default), number of client threads: 0 (use Netty's 
> default), server connect backlog: 0 (use Netty's default), client connect 
> timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
> 18:17:26,927 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Messages between TaskManager and JobManager have a max timeout of 100000 
> milliseconds
> 18:17:26,931 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Temporary file directory '/volumes/xvdb/tmp': total 319 GB, usable 319 
> GB (100.00% usable)
> 18:17:26,931 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Temporary file directory '/volumes/xvdc/tmp': total 319 GB, usable 319 
> GB (100.00% usable)
> 18:17:32,194 INFO  
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 
> 12288 MB for network buffer pool (number of memory segments: 393216, bytes 
> per segment: 32768).
> 18:17:32,195 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Using 0.9 of the maximum memory size for Flink managed off-heap memory 
> (45940 MB).
> 18:17:50,371 ERROR org.apache.flink.runtime.taskmanager.TaskManager           
>    - Error while starting up taskManager
> java.lang.Exception: OutOfMemory error (Direct buffer memory) while 
> allocating the TaskManager off-heap memory (48172092966 bytes). Try 
> increasing the maximum direct memory (-XX:MaxDirectMemorySize)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1633)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1460)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1325)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.main(TaskManager.scala:1235)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.main(TaskManager.scala)
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>       at java.nio.Bits.reserveMemory(Bits.java:658)
>       at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>       at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>       at 
> org.apache.flink.runtime.memory.MemoryManager$HybridOffHeapMemoryPool.<init>(MemoryManager.java:661)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.<init>(MemoryManager.java:166)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1618)
>       ... 4 more
> 18:17:50,374 ERROR org.apache.flink.runtime.taskmanager.TaskManager           
>    - Failed to run TaskManager.
> java.lang.Exception: OutOfMemory error (Direct buffer memory) while 
> allocating the TaskManager off-heap memory (48172092966 bytes). Try 
> increasing the maximum direct memory (-XX:MaxDirectMemorySize)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1633)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1460)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1325)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.main(TaskManager.scala:1235)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.main(TaskManager.scala)
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>       at java.nio.Bits.reserveMemory(Bits.java:658)
>       at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>       at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>       at 
> org.apache.flink.runtime.memory.MemoryManager$HybridOffHeapMemoryPool.<init>(MemoryManager.java:661)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.<init>(MemoryManager.java:166)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1618)
>       ... 4 more
> {noformat}
> {noformat}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> jobmanager.web.history: 50
> taskmanager.debug.memory.startLogThread: true
> taskmanager.debug.memory.logIntervalMs: 1000
> taskmanager.memory.fraction: 0.9
> taskmanager.memory.off-heap: true
> taskmanager.runtime.hashjoin-bloom-filters: true
> taskmanager.runtime.max-fan: 1024
> #==============================================================================
> # Common
> #==============================================================================
> # The host on which the JobManager runs. Only used in non-high-availability 
> mode.
> # The JobManager process will use this hostname to bind the listening servers 
> to.
> # The TaskManagers will try to connect to the JobManager on that host.
> jobmanager.rpc.address: localhost
> # The port where the JobManager's main actor system listens for messages.
> jobmanager.rpc.port: 6123
> # The heap size for the JobManager JVM
> jobmanager.heap.mb: 1024
> # The heap size for the TaskManager JVM
> taskmanager.heap.mb: 53248
> # The number of task slots that each TaskManager offers. Each slot runs one 
> parallel pipeline.
> taskmanager.numberOfTaskSlots: 32
> # The parallelism used for programs that did not specify and other 
> parallelism.
> parallelism.default: 32
> #==============================================================================
> # Web Frontend
> #==============================================================================
> # The port under which the web-based runtime monitor listens.
> # A value of -1 deactivates the web server.
> jobmanager.web.port: 8081
> # The port uder which the standalone web client
> # (for job upload and submit) listens.
> webclient.port: 8080
> # Temporary: Uncomment this to be able to use the new web frontend
> jobmanager.new-web-frontend: true
> #==============================================================================
> # Streaming state checkpointing
> #==============================================================================
> # The backend that will be used to store operator state checkpoints if 
> # checkpointing is enabled. 
> #
> # Supported backends: jobmanager, filesystem
> state.backend: jobmanager
> # Directory for storing checkpoints in a flink supported filesystem
> # Note: State backend must be accessible from the JobManager, use file://
> # only for local setups. 
> #
> # state.backend.fs.checkpointdir: hdfs://checkpoints
> #==============================================================================
> # Advanced
> #==============================================================================
> # The number of buffers for the network stack.
> taskmanager.network.numberOfBuffers: 393216
> # Directories for temporary files.
> #
> # Add a delimited list for multiple directories, using the system directory
> # delimiter (colon ':' on unix) or a comma, e.g.:
> #     /data1/tmp:/data2/tmp:/data3/tmp
> #
> # Note: Each directory entry is read from and written to by a different I/O
> # thread. You can include the same directory multiple times in order to create
> # multiple I/O threads against that directory. This is for example relevant 
> for
> # high-throughput RAIDs.
> #
> # If not specified, the system-specific Java temporary directory 
> (java.io.tmpdir
> # property) is taken.
> taskmanager.tmp.dirs: /volumes/xvdb/tmp:/volumes/xvdc/tmp
> # Path to the Hadoop configuration directory.
> #
> # This configuration is used when writing into HDFS. Unless specified 
> otherwise,
> # HDFS file creation will use HDFS default settings with respect to 
> block-size,
> # replication factor, etc.
> #
> # You can also directly specify the paths to hdfs-default.xml and 
> hdfs-site.xml
> # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
> #
> # fs.hdfs.hadoopconf: /path/to/hadoop/conf/
> #==============================================================================
> # High Availability
> #==============================================================================
> # The list of ZooKepper quorum peers that coordinate the high-availability
> # setup. This must be a list of the form
> # "host_1[:peerPort[:leaderPort]],host_2[:peerPort[:leaderPort]],..."
> #
> # recovery.mode: zookeeper
> #
> # ha.zookeeper.quorum: localhost
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to