This is an automated email from the ASF dual-hosted git repository.
doyeon pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new afb610e85c [Docs][Core] Add Javadoc to MultiTableSink (#10639)
afb610e85c is described below
commit afb610e85ce81913397e9a1fc642ee796d8d3130
Author: zoo-code <[email protected]>
AuthorDate: Sat Mar 28 18:55:39 2026 +0900
[Docs][Core] Add Javadoc to MultiTableSink (#10639)
---
.../api/sink/multitablesink/MultiTableSink.java | 92 ++++++++++++++++++++++
1 file changed, 92 insertions(+)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index b02e51fecc..713dc96506 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -45,6 +45,14 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+/**
+ * Central sink adapter that wraps multiple per-table {@link SeaTunnelSink}
instances into a single
+ * unified sink. Each table's sink is created from the {@link
MultiTableFactoryContext} and managed
+ * independently for writing, committing, and state snapshotting.
+ *
+ * <p>This class multiplies writers per subtask using {@code replicaNum} to
fill blocking write
+ * queues in {@link MultiTableSinkWriter}, improving throughput for
multi-table workloads.
+ */
public class MultiTableSink
implements SeaTunnelSink<
SeaTunnelRow,
@@ -56,6 +64,16 @@ public class MultiTableSink
@Getter private final Map<TablePath, SeaTunnelSink> sinks;
private final int replicaNum;
+ /**
+ * Constructs a MultiTableSink from the given factory context.
+ *
+ * <p>The {@code sinks} map is populated directly from {@link
+ * MultiTableFactoryContext#getSinks()}, keyed by {@link TablePath}. The
{@code replicaNum}
+ * controls how many writers are created per table per subtask. Each
subtask creates {@code
+ * replicaNum} writers to fill the blocking queues in {@link
MultiTableSinkWriter}.
+ *
+ * @param context the factory context containing per-table sinks and
configuration options
+ */
public MultiTableSink(MultiTableFactoryContext context) {
this.sinks = context.getSinks();
this.replicaNum =
@@ -67,6 +85,18 @@ public class MultiTableSink
return "MultiTableSink";
}
+ /**
+ * Creates a new {@link MultiTableSinkWriter} with freshly initialized
per-table writers.
+ *
+ * <p>For each table and each replica, a writer is created with a computed
index using the
+ * formula {@code index = subtaskIndex * replicaNum + i}. This scatters
writers across the
+ * blocking queues inside {@link MultiTableSinkWriter}, ensuring even
distribution of write
+ * load.
+ *
+ * @param context the sink writer context providing subtask index and
parallelism info
+ * @return a new {@link MultiTableSinkWriter} wrapping all per-table
writers
+ * @throws IOException if any per-table writer creation fails
+ */
@Override
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>
createWriter(
SinkWriter.Context context) throws IOException {
@@ -86,6 +116,19 @@ public class MultiTableSink
return new MultiTableSinkWriter(writers, replicaNum,
sinkWritersContext);
}
+ /**
+ * Restores a {@link MultiTableSinkWriter} from previously checkpointed
states.
+ *
+ * <p>Checkpoint states are matched back to per-table writers using {@link
SinkIdentifier}
+ * (composed of table identifier and computed index). If no matching state
is found for a given
+ * table and replica, a fresh writer is created instead via {@link
+ * SeaTunnelSink#createWriter(SinkWriter.Context)}.
+ *
+ * @param context the sink writer context providing subtask index and
parallelism info
+ * @param states the list of checkpoint states from a previous snapshot
+ * @return a restored {@link MultiTableSinkWriter} with per-table writers
rebuilt from state
+ * @throws IOException if any per-table writer restoration fails
+ */
@Override
public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState>
restoreWriter(
SinkWriter.Context context, List<MultiTableState> states) throws
IOException {
@@ -126,6 +169,16 @@ public class MultiTableSink
return Optional.of(new DefaultSerializer<>());
}
+ /**
+ * Creates a {@link MultiTableSinkCommitter} that aggregates per-table
{@link SinkCommitter}
+ * instances.
+ *
+ * <p>Iterates over all registered sinks and collects their committers. If
none of the sub-sinks
+ * provide a committer, returns {@link Optional#empty()}.
+ *
+ * @return an optional containing the aggregated committer, or empty if no
sub-sink has one
+ * @throws IOException if any per-table committer creation fails
+ */
@Override
public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter()
throws IOException {
Map<String, SinkCommitter<?>> committers = new HashMap<>();
@@ -148,6 +201,16 @@ public class MultiTableSink
return Optional.of(new DefaultSerializer<>());
}
+ /**
+ * Creates a {@link MultiTableSinkAggregatedCommitter} that aggregates
per-table {@link
+ * SinkAggregatedCommitter} instances across all sub-sinks.
+ *
+ * <p>If none of the sub-sinks provide an aggregated committer, returns
{@link
+ * Optional#empty()}.
+ *
+ * @return an optional containing the aggregated committer, or empty if no
sub-sink has one
+ * @throws IOException if any per-table aggregated committer creation fails
+ */
@Override
public Optional<SinkAggregatedCommitter<MultiTableCommitInfo,
MultiTableAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
@@ -165,6 +228,15 @@ public class MultiTableSink
return Optional.of(new
MultiTableSinkAggregatedCommitter(aggCommitters));
}
+ /**
+ * Returns the list of {@link TablePath}s for all tables managed by this
sink.
+ *
+ * <p>For each sub-sink, tries {@link
SeaTunnelSink#getWriteCatalogTable()} first to extract the
+ * table path from the catalog table. If that is not present, falls back
to using the {@link
+ * TablePath} key from the original sinks map.
+ *
+ * @return the list of table paths for all managed tables
+ */
public List<TablePath> getSinkTables() {
List<TablePath> tablePaths = new ArrayList<>();
@@ -191,11 +263,31 @@ public class MultiTableSink
sinks.values().forEach(sink -> sink.setJobContext(jobContext));
}
+ /**
+ * Always returns empty in multi-table context.
+ *
+ * <p>In a multi-table sink, catalog tables are managed individually by
each sub-sink rather
+ * than at the top level. This method delegates to the parent interface
default, which returns
+ * {@link Optional#empty()}.
+ *
+ * @return {@link Optional#empty()}, always
+ */
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return SeaTunnelSink.super.getWriteCatalogTable();
}
+ /**
+ * Delegates schema evolution support to the first sub-sink.
+ *
+ * <p>Precondition: the sinks map must contain at least one entry.
+ *
+ * <p>If the first sub-sink implements {@link SupportSchemaEvolutionSink},
returns its supported
+ * {@link SchemaChangeType} list. Otherwise returns an empty list,
indicating no schema
+ * evolution support.
+ *
+ * @return the list of supported schema change types, or empty if not
supported
+ */
@Override
public List<SchemaChangeType> supports() {
SeaTunnelSink firstSink =
sinks.entrySet().iterator().next().getValue();