This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 793bb795c3 [Fix][Iceberg] Fix IllegalMonitorStateException in
streaming enumerator (#10131)
793bb795c3 is described below
commit 793bb795c37d8e10b395ef0b601c27dd5393ab6a
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Dec 24 10:19:20 2025 +0800
[Fix][Iceberg] Fix IllegalMonitorStateException in streaming enumerator
(#10131)
Co-authored-by: zengyi <[email protected]>
---
.../enumerator/IcebergStreamSplitEnumerator.java | 16 ++-
.../IcebergStreamSplitEnumeratorTest.java | 139 +++++++++++++++++++++
2 files changed, 150 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
index 84ebcb32a5..fffca402bc 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -46,7 +47,8 @@ import java.util.concurrent.ConcurrentMap;
public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator {
private final ConcurrentMap<TablePath, IcebergEnumeratorPosition>
tableOffsets;
- private volatile boolean initialized = false;
+
+ @VisibleForTesting volatile boolean initialized = false;
public IcebergStreamSplitEnumerator(
Context<IcebergFileScanTaskSplit> context,
@@ -79,9 +81,9 @@ public class IcebergStreamSplitEnumerator extends
AbstractSplitEnumerator {
Set<Integer> readers = context.registeredReaders();
while (true) {
for (TablePath tablePath : pendingTables) {
- checkThrowInterruptedException();
-
synchronized (stateLock) {
+ checkThrowInterruptedException();
+
log.info("Scan table {}.", tablePath);
Collection<IcebergFileScanTaskSplit> splits =
loadSplits(tablePath);
@@ -95,7 +97,9 @@ public class IcebergStreamSplitEnumerator extends
AbstractSplitEnumerator {
initialized = true;
}
- stateLock.wait(sourceConfig.getIncrementScanInterval());
+ synchronized (stateLock) {
+ stateLock.wait(sourceConfig.getIncrementScanInterval());
+ }
}
}
@@ -112,7 +116,9 @@ public class IcebergStreamSplitEnumerator extends
AbstractSplitEnumerator {
@Override
public void handleSplitRequest(int subtaskId) {
if (initialized) {
- stateLock.notifyAll();
+ synchronized (stateLock) {
+ stateLock.notifyAll();
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
new file mode 100644
index 0000000000..7e8ec18e5f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
+
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCommonOptions;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Minimal test for {@link IcebergStreamSplitEnumerator} wait / notify fix. */
+class IcebergStreamSplitEnumeratorTest {
+
+ @Test
+ void testHandleSplitRequestDoesNotThrowIllegalMonitorStateException()
throws Exception {
+ SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context =
+ new DummyEnumeratorContext();
+
+ IcebergSourceConfig sourceConfig = createSourceConfig();
+
+ // Catalog tables must be non-empty because AbstractSplitEnumerator
uses the size as the
+ // capacity of an ArrayBlockingQueue.
+ TablePath tablePath = TablePath.of("default", "source");
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("seatunnel", "default", "source"),
+ TableSchema.builder().build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "test table");
+ Map<TablePath, CatalogTable> catalogTables =
+ Collections.singletonMap(tablePath, catalogTable);
+
+ IcebergStreamSplitEnumerator enumerator =
+ new IcebergStreamSplitEnumerator(
+ context, sourceConfig, catalogTables,
Collections.emptyMap());
+
+ // Force initialized = true so handleSplitRequest executes the notify
logic.
+ enumerator.initialized = true;
+
+ // Before the fix, this would throw IllegalMonitorStateException
because notifyAll was
+ // called without holding the monitor.
+ Assertions.assertDoesNotThrow(() -> enumerator.handleSplitRequest(0));
+ }
+
+ private IcebergSourceConfig createSourceConfig() {
+ Map<String, Object> configs = new HashMap<>();
+ Map<String, Object> catalogProps = new HashMap<>();
+ catalogProps.put("type", "hadoop");
+ catalogProps.put("warehouse", Paths.get("target", "iceberg",
"hadoop").toUri().toString());
+
+ configs.put(IcebergCommonOptions.KEY_CATALOG_NAME.key(), "seatunnel");
+ configs.put(IcebergCommonOptions.KEY_NAMESPACE.key(), "default");
+ configs.put(IcebergCommonOptions.KEY_TABLE.key(), "source");
+ configs.put(IcebergCommonOptions.CATALOG_PROPS.key(), catalogProps);
+
+ return new IcebergSourceConfig(ReadonlyConfig.fromMap(configs));
+ }
+
+ private static class DummyEnumeratorContext
+ implements SourceSplitEnumerator.Context<IcebergFileScanTaskSplit>
{
+
+ private final MetricsContext metricsContext = new
AbstractMetricsContext() {};
+ private final EventListener eventListener =
+ new EventListener() {
+ @Override
+ public void onEvent(Event event) {
+ // no-op
+ }
+ };
+
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
+
+ @Override
+ public java.util.Set<Integer> registeredReaders() {
+ return Collections.singleton(0);
+ }
+
+ @Override
+ public void assignSplit(int subtaskId,
java.util.List<IcebergFileScanTaskSplit> splits) {
+ // no-op
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+ // no-op
+ }
+
+ @Override
+ public void sendEventToSourceReader(
+ int subtaskId, org.apache.seatunnel.api.source.SourceEvent
event) {
+ // no-op
+ }
+
+ @Override
+ public MetricsContext getMetricsContext() {
+ return metricsContext;
+ }
+
+ @Override
+ public EventListener getEventListener() {
+ return eventListener;
+ }
+ }
+}