kishanshukla-2307 opened a new issue, #36539:
URL: https://github.com/apache/beam/issues/36539
### What happened?
I’m trying to run a Go → Kafka → Go (KafkaIO) proof of concept using Apache
Beam Go SDK v2.68 with the Flink portable runner.
The job starts successfully and launches two SDK harnesses (Go and Java),
but the KafkaIO read transform never emits elements — the job stays idle and
periodically logs “operation ongoing in bundle … without outputting or
completing.”
Messages published to the input topic are not seen in the Go ParDo or
written to the output topic.
Environment
Component - Version / Setup
Beam Go SDK - 2.68.0
Flink Runner - 1.19 (MiniCluster via
beam-runners-flink-1.19-job-server-2.68.0.jar)
Expansion Service - beam-sdks-java-io-expansion-service-2.68.0.jar
Kafka - v4.0.0 / Local (localhost:9092)
Harness environment - Docker (apache/beam_go_sdk:2.68.0,
apache/beam_java17_sdk:2.68.0)
OS - macOS (Docker Desktop)
Runner options used: --runner=portable
--endpoint=localhost:8099 --environment_type=DOCKER
-- environment_config=apache/beam_go_sdk:2.68.0
--expansion_addr=localhost:8088
What works
• Go-only pipelines (beam.Create → ParDo(log)) complete
successfully.
• Kafka producer and consumer work fine outside Beam.
• Two SDK harnesses start as expected (Go + Java).
• Java harness logs Kafka producer initialization.
What fails
• KafkaIO.Read emits no elements to the Go ParDo.
• No messages appear on the output topic (when KafkaIO.Write is
enabled).
• Go DoFn logs (log.Infof) never appear.
• The job remains active with the “bundle ongoing” warnings shown
above.
My Pipeline code:
```
package main
import (
"context"
"flag"
"fmt"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
const (
kafkaBroker = "localhost:9092"
inputTopic = "input"
outputTopic = "output"
)
var expansionAddr = flag.String("expansion_addr", "", "Address of Kafka
expansion service")
func logRecord(ctx context.Context, k, v []byte) ([]byte, []byte) {
log.Infof(ctx, "got record key=%q val=%q", string(k), string(v))
return k, v
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()
input := kafkaio.Read(
s,
*expansionAddr,
kafkaBroker,
[]string{inputTopic}
)
beam.ParDo(s, logRecord, input)
kafkaio.Write(s, *expansionAddr, kafkaBroker, outputTopic, input)
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Pipeline failed: %v", err)
}
}
```
Reproduction Steps
1. Start Kafka locally and create input and output topics.
2. Run the Kafka expansion service: ```java -jar
beam-sdks-java-io-expansion-service-2.68.0.jar 8088
--javaClassLookupAllowlistFile='*'```
3. Start the Flink job server:```java -jar
beam-runners-flink-1.19-job-server-2.68.0.jar --job-port=8099
--artifact-port=8098```
4. Run the Go pipeline: ```go run . --runner=portable
--endpoint=localhost:8099 --expansion_addr=localhost:8088
--environment_type=DOCKER --environment_config=apache/beam_go_sdk:2.68.0```
5. Produce a few messages to kafka input topic
6. Observe no messages logged and no output produced.
Logs
Job server (a part of log):
```
Oct 16, 2025 8:23:47 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: initializing Kafka metrics collector
Oct 16, 2025 8:23:47 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka version: 3.9.0
Oct 16, 2025 8:23:47 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka commitId: 84caaa6e9da06435
Oct 16, 2025 8:23:47 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: Kafka startTimeMs: 1760626427960
Oct 16, 2025 8:23:47 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
INFO: [Consumer clientId=consumer-Reader-0_offset_consumer_904017666_none-3,
groupId=Reader-0_offset_consumer_904017666_none] Cluster ID:
5L6g3nShT-eMCtK--X86sw
Oct 16, 2025 8:23:49 PM
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
getProcessBundleDescriptor
INFO: getProcessBundleDescriptor request with id 1-3
Oct 16, 2025 8:23:49 PM
org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter log
WARNING: Failed to parse element_processing_timeout: time: invalid duration
"", there will be no timeout for processing an element in a PTransform operation
Oct 16, 2025 8:23:49 PM
org.apache.beam.runners.fnexecution.data.GrpcDataService data
INFO: Beam Fn Data client connected.
...
WARNING: Operation ongoing in bundle 1 for at least 05m00s without
outputting or completing
```
Go harness:
```
2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1"
value:{string_value:"go-job-1-1760626399474061000"}}
fields:{key:"beam:option:experiments:v1"
value:{list_value:{values:{string_value:"beam_fn_api"}}}}
fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}}
fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}}
fields:{key:"beam:option:go_options:v1"
value:{struct_value:{fields:{key:"options"
value:{struct_value:{fields:{key:"endpoint"
value:{string_value:"localhost:8099"}} fields:{key:"environment_config"
value:{string_value:"apache/beam_go_sdk:2.68.0"}}
fields:{key:"environment_type" value:{string_value:"DOCKER"}}
fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}}
fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}}
fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}}
fields:{key:"runner" value:{string_value:"portable"}}}}}}}}
fields:{key:"beam:option:
job_name:v1"
value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}}
fields:{key:"beam:option:options_id:v1" value:{number_value:2}}
fields:{key:"beam:option:output_executable_path:v1"
value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1"
value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1"
value:{bool_value:false}} fields:{key:"beam:option:runner:v1"
value:{null_value:NULL_VALUE}}}
retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b"
logging_endpoint:{url:"host.docker.internal:61976"}
artifact_endpoint:{url:"host.docker.internal:61978"}
control_endpoint:{url:"host.docker.internal:61974"}
dependencies:{type_urn:"beam:artifact:type:file:v1"
type_payload:"\n\xdc\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/1-0:go-/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/worker-1-1760626399474073000"
role
_urn:"beam:artifact:role:go_worker_binary:v1"}
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:40 2025/10/16 14:53:40 Downloaded:
/tmp/staged/1-worker-1-1760626399474073000 (sha256:
4e8c52729f472480c02833507b225ea334663bea8426cd623e2afc6993ed630f, size:
67584651)
2025-10-16 20:23:43 Error Setting Rlimit operation not permitted
```
Java Harness:
```
2025-10-16 20:23:38 2025/10/16 14:53:38 Provision info:
2025-10-16 20:23:38 pipeline_options:{fields:{key:"beam:option:app_name:v1"
value:{string_value:"go-job-1-1760626399474061000"}}
fields:{key:"beam:option:experiments:v1"
value:{list_value:{values:{string_value:"beam_fn_api"}}}}
fields:{key:"beam:option:flink_conf_dir:v1" value:{null_value:NULL_VALUE}}
fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}}
fields:{key:"beam:option:go_options:v1"
value:{struct_value:{fields:{key:"options"
value:{struct_value:{fields:{key:"endpoint"
value:{string_value:"localhost:8099"}} fields:{key:"environment_config"
value:{string_value:"apache/beam_go_sdk:2.68.0"}}
fields:{key:"environment_type" value:{string_value:"DOCKER"}}
fields:{key:"expansion_addr" value:{string_value:"localhost:8088"}}
fields:{key:"hookOrder" value:{string_value:"[\"default_remote_logging\"]"}}
fields:{key:"hooks" value:{string_value:"{\"default_remote_logging\":null}"}}
fields:{key:"runner" value:{string_value:"portable"}}}}}}}} f
ields:{key:"beam:option:job_name:v1"
value:{string_value:"go0job0101760626399474061000-kishan-1016145325-78a374a3"}}
fields:{key:"beam:option:options_id:v1" value:{number_value:2}}
fields:{key:"beam:option:output_executable_path:v1"
value:{null_value:NULL_VALUE}} fields:{key:"beam:option:parallelism:v1"
value:{number_value:-1}} fields:{key:"beam:option:retain_docker_containers:v1"
value:{bool_value:false}} fields:{key:"beam:option:runner:v1"
value:{null_value:NULL_VALUE}}}
retrieval_token:"go-job-1-1760626399474061000_5cf318dd-51e6-493b-93bb-3cc23d02842b"
logging_endpoint:{url:"host.docker.internal:61977"}
artifact_endpoint:{url:"host.docker.internal:61979"}
control_endpoint:{url:"host.docker.internal:61975"}
dependencies:{type_urn:"beam:artifact:type:file:v1"
type_payload:"\n\xec\x01/var/folders/qx/ttwsvpc52bj63wkm_21nhxx40000gn/T/beam-artifact-staging/7085c9eac9bb0341b8b0032b2bb08a63bdd259069eaa3e4a3823483c3c1c5a2b/2-0:pAOCMJEhVBbeam:env:dock-beam-sdks-java-io-exp
ansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjo"
role_urn:"beam:artifact:role:staging_to:v1"
role_payload:"\nZbeam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar"}
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2025-10-16 20:23:44 2025/10/16 14:53:44 Downloaded:
/tmp/1-1/staged/beam-sdks-java-io-expansion-service-2.68.0-DlPxq5ePGVaO4Mfpmkn3NROOlbNjor7qSKlAF02OBD8.jar
(sha256: 0e53f1ab978f19568ee0c7e99a49f735138e95b363a2beea48a940174d8e043f,
size: 387766088)
2025-10-16 20:23:45 Running JvmInitializer#onStartup for
org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 Completed JvmInitializer#onStartup for
org.apache.beam.sdk.io.kafka.KafkaIOInitializer@7a92922
2025-10-16 20:23:45 SDK Fn Harness started
2025-10-16 20:23:45 Harness ID 1-1
2025-10-16 20:23:45 Logging location url:"host.docker.internal:61977"
2025-10-16 20:23:45 Control location url:"host.docker.internal:61975"
2025-10-16 20:23:45 Status location null
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json
2025-10-16 20:23:45 Pipeline Options File pipeline_options.json exists.
Overriding existing options.
2025-10-16 20:23:45 Pipeline options
{"beam:option:app_name:v1":"go-job-1-1760626399474061000",
"beam:option:experiments:v1":["beam_fn_api"],
"beam:option:flink_conf_dir:v1":null, "beam:option:flink_master:v1":"[auto]",
"beam:option:go_options:v1":{"options":{"endpoint":"localhost:8099",
"environment_config":"apache/beam_go_sdk:2.68.0", "environment_type":"DOCKER",
"expansion_addr":"localhost:8088", "hookOrder":"[\"default_remote_logging\"]",
"hooks":"{\"default_remote_logging\":null}", "runner":"portable"}},
"beam:option:job_name:v1":"go0job0101760626399474061000-kishan-1016145325-78a374a3",
"beam:option:options_id:v1":2, "beam:option:output_executable_path:v1":null,
"beam:option:parallelism:v1":-1,
"beam:option:retain_docker_containers:v1":false, "beam:option:runner:v1":null}
```
(but job continues normally)
Is KafkaIO fully supported for Go SDK cross-language pipelines in Beam
2.68.0?
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [x] Component: Go SDK
- [ ] Component: Typescript SDK
- [x] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [x] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]