gonzojive opened a new issue, #21817:
URL: https://github.com/apache/beam/issues/21817

   ### What happened?
   
   
   I am not a beam developer, just a user.
   
   I wrote a Beam pipeline to download zip files over HTTP and write them to 
disk in a record-oriented format (Riegeli).
   
   - The direct runner stores everything in memory, so it has no hope of 
avoiding OOM. That's fine, I guess.
   - Even with Flink, I can't seem to avoid running out of memory. I have 35 GB 
free. The Go process eventually dies after getting up to 7GB of consumption, 
while Flink is presumably consuming the rest.
   
   Would it be possible to do something crude like return "Resource exhausted" 
codes from the Go harness after a certain memory threshold is reached? Would 
the Flink runner be smart enough to reduce the request rate and memory pressure?
   
   
   Launch commands:
   
   
   Beam pipeline:
   
   ```shell
   bazel run //:download_stuff -- --runner flink --endpoint localhost:8099 
"--output_riegeli" "/tmp/[email protected]" "--environment_type" "LOOPBACK" 
--alsologtostderr --beam_strict 
   ```
   
   I can reproduce if the authors think that would be useful.
   
   Launch commands:
   
   Flink
   ```shell
   bazel run :job -- "--flink-conf-dir" /home/red/flink/
   ```
   
   Where :job is defined by
   
   ```starlark
   java_binary(
       name = "job",
       main_class = "org.apache.beam.runners.flink.FlinkJobServerDriver",
       runtime_deps = [
           # Obtained by looking at results of
           # bazel query @maven//...
           "@maven//:org_apache_beam_beam_runners_flink_1_14",
           "@maven//:org_slf4j_slf4j_api",
           "@maven//:org_slf4j_slf4j_simple",
           "@maven//:org_apache_flink_flink_runtime_web_2_12",
       ],
   )
   ```
   
   flink-conf.yaml
   
   ```yaml
   
################################################################################
   #  Licensed to the Apache Software Foundation (ASF) under one
   #  or more contributor license agreements.  See the NOTICE file
   #  distributed with this work for additional information
   #  regarding copyright ownership.  The ASF licenses this file
   #  to you under the Apache License, Version 2.0 (the
   #  "License"); you may not use this file except in compliance
   #  with the License.  You may obtain a copy of the License at
   #
   #      http://www.apache.org/licenses/LICENSE-2.0
   #
   #  Unless required by applicable law or agreed to in writing, software
   #  distributed under the License is distributed on an "AS IS" BASIS,
   #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   #  See the License for the specific language governing permissions and
   # limitations under the License.
   
################################################################################
   
   
   
#==============================================================================
   # Common
   
#==============================================================================
   
   # The external address of the host on which the JobManager runs and can be
   # reached by the TaskManagers and any clients which want to connect. This 
setting
   # is only used in Standalone mode and may be overwritten on the JobManager 
side
   # by specifying the --host <hostname> parameter of the bin/jobmanager.sh 
executable.
   # In high availability mode, if you use the bin/start-cluster.sh script and 
setup
   # the conf/masters file, this will be taken care of automatically. Yarn
   # automatically configure the host name based on the hostname of the node 
where the
   # JobManager runs.
   
   # jobmanager.rpc.address: localhost
   
   # The RPC port where the JobManager is reachable.
   
   # jobmanager.rpc.port: 6123
   
   # The host interface the JobManager will bind to. My default, this is 
localhost, and will prevent
   # the JobManager from communicating outside the machine/container it is 
running on.
   # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
   # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
   #
   # To enable this, set the bind-host address to one that has access to an 
outside facing network
   # interface, such as 0.0.0.0.
   
   # jobmanager.bind-host: localhost
   
   
   # The total process memory size for the JobManager.
   #
   # Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.
   
   jobmanager.memory.process.size: 2gb
   
   # The host interface the TaskManager will bind to. By default, this is 
localhost, and will prevent
   # the TaskManager from communicating outside the machine/container it is 
running on.
   # On YARN this setting will be ignored if it is set to 'localhost', 
defaulting to 0.0.0.0.
   # On Kubernetes this setting will be ignored, defaulting to 0.0.0.0.
   #
   # To enable this, set the bind-host address to one that has access to an 
outside facing network
   # interface, such as 0.0.0.0.
   
   # taskmanager.bind-host: localhost
   
   # The address of the host on which the TaskManager runs and can be reached 
by the JobManager and
   # other TaskManagers. If not specified, the TaskManager will try different 
strategies to identify
   # the address.
   #
   # Note this address needs to be reachable by the JobManager and forward 
traffic to one of
   # the interfaces the TaskManager is bound to (see 'taskmanager.bind-host').
   #
   # Note also that unless all TaskManagers are running on the same machine, 
this address needs to be
   # configured separately for each TaskManager.
   
   # taskmanager.host: localhost
   
   # The total process memory size for the TaskManager.
   #
   # Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.
   #
   # 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#task-operator-heap-memory
   
   taskmanager.memory.process.size: 6gb
   taskmanager.memory.task.heap.size: 4gb
   
   # To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
   # It is not recommended to set both 'taskmanager.memory.process.size' and 
Flink memory.
   #
   taskmanager.memory.flink.size: 1gb
   
   # TODO(reddaly): Perhaps use taskmanager.memory.managed.size instead:
   # https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config
   taskmanager.memory.managed.size: 4gb
   
   # The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.
   
   # taskmanager.numberOfTaskSlots: 1
   
   # The parallelism used for programs that did not specify and other 
parallelism.
   
   parallelism.default: 6
   
   # The default file system scheme and authority.
   # 
   # By default file paths without scheme are interpreted relative to the local
   # root file system 'file:///'. Use this to override the default and interpret
   # relative paths relative to a different file system,
   # for example 'hdfs://mynamenode:12345'
   #
   # fs.default-scheme
   
   
#==============================================================================
   # High Availability
   
#==============================================================================
   
   # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
   #
   # high-availability: zookeeper
   
   # The path where metadata for master recovery is persisted. While ZooKeeper 
stores
   # the small ground truth for checkpoint and leader election, this location 
stores
   # the larger objects, like persisted dataflow graphs.
   # 
   # Must be a durable file system that is accessible from all nodes
   # (like HDFS, S3, Ceph, nfs, ...) 
   #
   # high-availability.storageDir: hdfs:///flink/ha/
   
   # The list of ZooKeeper quorum peers that coordinate the high-availability
   # setup. This must be a list of the form:
   # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
   #
   # high-availability.zookeeper.quorum: localhost:2181
   
   
   # ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
   # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
   # The default value is "open" and it can be changed to "creator" if ZK 
security is enabled
   #
   # high-availability.zookeeper.client.acl: open
   
   
#==============================================================================
   # Fault tolerance and checkpointing
   
#==============================================================================
   
   # The backend that will be used to store operator state checkpoints if
   # checkpointing is enabled. Checkpointing is enabled when 
execution.checkpointing.interval > 0.
   #
   # Execution checkpointing related parameters. Please refer to 
CheckpointConfig and ExecutionCheckpointingOptions for more details.
   #
   # execution.checkpointing.interval: 3min
   # execution.checkpointing.externalized-checkpoint-retention: 
[DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
   # execution.checkpointing.max-concurrent-checkpoints: 1
   # execution.checkpointing.min-pause: 0
   # execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
   # execution.checkpointing.timeout: 10min
   # execution.checkpointing.tolerable-failed-checkpoints: 0
   # execution.checkpointing.unaligned: false
   #
   # Supported backends are 'hashmap', 'rocksdb', or the
   # <class-name-of-factory>.
   #
   # state.backend: hashmap
   
   state.backend: rocksdb
   state.backend.incremental: true
   state.checkpoints.dir: /tmp/rocks-state # location to store checkpoints
   
   # Directory for checkpoints filesystem, when using any of the default bundled
   # state backends.
   #
   # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
   
   # Default target directory for savepoints, optional.
   #
   # state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
   
   # Flag to enable/disable incremental checkpoints for backends that
   # support incremental checkpoints (like the RocksDB state backend). 
   #
   # state.backend.incremental: false
   
   # The failover strategy, i.e., how the job computation recovers from task 
failures.
   # Only restart tasks that may have been affected by the task failure, which 
typically includes
   # downstream tasks and potentially upstream tasks if their produced data is 
no longer available for consumption.
   
   # jobmanager.execution.failover-strategy: region
   
   
#==============================================================================
   # Rest & web frontend
   
#==============================================================================
   
   # The port to which the REST client connects to. If rest.bind-port has
   # not been specified, then the server will bind to this port as well.
   #
   #rest.port: 8081
   
   # The address to which the REST client will connect to
   #
   # rest.address: localhost
   
   # Port range for the REST and web server to bind to.
   #
   # rest.bind-port: 8080-8090
   
   # The address that the REST & web server binds to
   # By default, this is localhost, which prevents the REST & web server from
   # being able to communicate outside of the machine/container it is running 
on.
   #
   # To enable this, set the bind address to one that has access to 
outside-facing
   # network interface, such as 0.0.0.0.
   #
   #rest.bind-address: localhost
   rest.bind-port: 4243
   
   # See 
https://sourcegraph.com/github.com/apache/flink/-/blob/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
   #web.bind-address: localhost
   web.port: 4242
   #web.ssl.enabled: false
   
   # Flag to specify whether job submission is enabled from the web-based
   # runtime monitor. Uncomment to disable.
   
   #web.submit.enable: false
   
   # Flag to specify whether job cancellation is enabled from the web-based
   # runtime monitor. Uncomment to disable.
   
   #web.cancel.enable: false
   
   
#==============================================================================
   # Advanced
   
#==============================================================================
   
   # Override the directories for temporary files. If not specified, the
   # system-specific Java temporary directory (java.io.tmpdir property) is 
taken.
   #
   # For framework setups on Yarn, Flink will automatically pick up the
   # containers' temp directories without any need for configuration.
   #
   # Add a delimited list for multiple directories, using the system directory
   # delimiter (colon ':' on unix) or a comma, e.g.:
   #     /data1/tmp:/data2/tmp:/data3/tmp
   #
   # Note: Each directory entry is read from and written to by a different I/O
   # thread. You can include the same directory multiple times in order to 
create
   # multiple I/O threads against that directory. This is for example relevant 
for
   # high-throughput RAIDs.
   #
   # io.tmp.dirs: /tmp
   
   # The classloading resolve order. Possible values are 'child-first' (Flink's 
default)
   # and 'parent-first' (Java's default).
   #
   # Child first classloading allows users to use different dependency/library
   # versions in their application than those in the classpath. Switching back
   # to 'parent-first' may help with debugging dependency issues.
   #
   # classloader.resolve-order: child-first
   
   # The amount of memory going to the network stack. These numbers usually 
need 
   # no tuning. Adjusting them may be necessary in case of an "Insufficient 
number
   # of network buffers" error. The default min is 64MB, the default max is 1GB.
   # 
   # taskmanager.memory.network.fraction: 0.1
   #taskmanager.memory.network.min: 64mb
   taskmanager.memory.network.min: 1gb
   taskmanager.memory.network.max: 1gb
   
   
#==============================================================================
   # Flink Cluster Security Configuration
   
#==============================================================================
   
   # Kerberos authentication for various components - Hadoop, ZooKeeper, and 
connectors -
   # may be enabled in four steps:
   # 1. configure the local krb5.conf file
   # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ 
kinit)
   # 3. make the credentials available to various JAAS login contexts
   # 4. configure the connector to use JAAS/SASL
   
   # The below configure how Kerberos credentials are provided. A keytab will 
be used instead of
   # a ticket cache if the keytab path and principal are set.
   
   # security.kerberos.login.use-ticket-cache: true
   # security.kerberos.login.keytab: /path/to/kerberos/keytab
   # security.kerberos.login.principal: flink-user
   
   # The configuration below defines which JAAS login contexts
   
   # security.kerberos.login.contexts: Client,KafkaClient
   
   
#==============================================================================
   # ZK Security Configuration
   
#==============================================================================
   
   # Below configurations are applicable if ZK ensemble is configured for 
security
   
   # Override below configuration to provide custom ZK service name if 
configured
   # zookeeper.sasl.service-name: zookeeper
   
   # The configuration below must match one of the values set in 
"security.kerberos.login.contexts"
   # zookeeper.sasl.login-context-name: Client
   
   
#==============================================================================
   # HistoryServer
   
#==============================================================================
   
   # The HistoryServer is started and stopped via bin/historyserver.sh 
(start|stop)
   
   # Directory to upload completed jobs to. Add this directory to the list of
   # monitored directories of the HistoryServer as well (see below).
   #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
   
   # The address under which the web-based HistoryServer listens.
   #historyserver.web.address: 0.0.0.0
   
   # The port under which the web-based HistoryServer listens.
   #historyserver.web.port: 8082
   
   # Comma separated list of directories to monitor for completed jobs.
   #historyserver.archive.fs.dir: hdfs:///completed-jobs/
   
   # Interval in milliseconds for refreshing the monitored directories.
   #historyserver.archive.fs.refresh-interval: 10000
   ```
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to