masfworld opened a new issue, #35940: URL: https://github.com/apache/beam/issues/35940
### What happened? ### Description When writing to Iceberg tables with Managed.write(Managed.ICEBERG) in a streaming pipeline, I consistently see the following error in Dataflow workers: ``` java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open at org.apache.beam.sdk.io.iceberg.RecordWriterManager.close(RecordWriterManager.java:395) at org.apache.beam.sdk.io.iceberg.WriteGroupedRowsToFiles$WriteGroupedRowsToFilesDoFn.processElement(WriteGroupedRowsToFiles.java:110) ``` Once it starts, this error repeats many times after 20-30 min, but not always in the same moment; the metric activeIcebergWriters rises (e.g., 50+) and the job stops writing new data. There is no consistent user-code exception preceding it. It occurs even after we serialize the sink to a single lane and reduce parallelism to the minimum. ### Steps to reproduce Minimal pipeline (simplified for reproducibility): ``` public class MainSimpleCount { private static final Logger LOG = LoggerFactory.getLogger(MainSimpleCount.class); public static void main(String[] args) throws Exception { IcebergPipelineOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(IcebergPipelineOptions.class); options.setStreaming(true); options.setEnableStreamingEngine(true); Pipeline pipeline = Pipeline.create(options); // Parse JSON into Rows (note: schema fields are UPPERCASE per your error log) PCollection<Row> input = pipeline .apply("ReadFromPubSub", PubsubIO.readStrings().fromSubscription(options.getInputSubscription())) .apply("ParseJsonToRow", ParDo.of(new ParseJsonToRow())) .setRowSchema(Schemas.INPUT_SCHEMA); // Simple 1-minute fixed window on processing time (no triggers, no accumulation) PCollection<Row> windowed = input.apply("Window1m", Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1)))); // Key by ID we derive from UPPERCASE fields: a + '|' + b PCollection<KV<String, Row>> keyed = windowed.apply("KeyById", WithKeys.of((Row r) -> { String a = r.getString("a"); String b = r.getString("b"); return a + "|" + b; })) .setCoder(org.apache.beam.sdk.coders.KvCoder.of( org.apache.beam.sdk.coders.StringUtf8Coder.of(), org.apache.beam.sdk.schemas.SchemaCoder.of(Schemas.INPUT_SCHEMA))); // Group by ID PCollection<KV<String, Iterable<Row>>> grouped = keyed.apply("GroupById", GroupByKey.create()); // Count per key -> build output Row(ID, count_id) PCollection<Row> counted = grouped.apply("CountRows", ParDo.of(new DoFn<KV<String, Iterable<Row>>, Row>() { @ProcessElement public void processElement(ProcessContext c) { String Id = c.element().getKey(); long count = 0; for (Row r : c.element().getValue()) { count++; } Row outRow = Row.withSchema(Schemas.CC_COUNT_SCHEMA) .addValues(Id, count) .build(); c.output(outRow); } })) .setRowSchema(Schemas.CC_COUNT_SCHEMA); // Write to Iceberg counted.apply("WriteToIceberg", Managed.write(Managed.ICEBERG) .withConfig(ImmutableMap.<String, Object>builder() .put("table", "test_namespace.prices_iceberg_table_1m_id_count") .put("catalog_name", "snowflake_open_catalog_internal") .put("catalog_properties", ImmutableMap.<String, String>builder() .put("type", "rest") .put("uri", "https://XXXX.XXXXX.gcp.snowflakecomputing.com/polaris/api/catalog/") .put("oauth2-server-uri", "https://XXXX.XXXX.gcp.snowflakecomputing.com/polaris/api/catalog/v1/oauth/tokens") .put("header.X-Iceberg-Access-Delegation", "vended-credentials") .put("credential", options.getCredential()) .put("warehouse", "snowflake_open_catalog_internal") .put("scope", "PRINCIPAL_ROLE:principal_role") .put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO") .put("token-refresh-enabled", "false") .build()) // only keep the two table columns .put("keep", java.util.List.of("id", "count_id")) .put("triggering_frequency_seconds", 120) .build())); pipeline.run(); } } ``` Environment: - Apache Beam: 2.67.0 - Runner: Google Cloud Dataflow (streaming) - Input source: Pub/Sub → JSON → Beam Row - Snowflake Open Catalog for Iceberg catalog ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [ ] Component: Python SDK - [x] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [x] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org