This is an automated email from the ASF dual-hosted git repository.

doyeon pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new afb610e85c [Docs][Core] Add Javadoc to MultiTableSink (#10639)
afb610e85c is described below

commit afb610e85ce81913397e9a1fc642ee796d8d3130
Author: zoo-code <[email protected]>
AuthorDate: Sat Mar 28 18:55:39 2026 +0900

    [Docs][Core] Add Javadoc to MultiTableSink (#10639)
---
 .../api/sink/multitablesink/MultiTableSink.java    | 92 ++++++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
index b02e51fecc..713dc96506 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java
@@ -45,6 +45,14 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+/**
+ * Central sink adapter that wraps multiple per-table {@link SeaTunnelSink} 
instances into a single
+ * unified sink. Each table's sink is created from the {@link 
MultiTableFactoryContext} and managed
+ * independently for writing, committing, and state snapshotting.
+ *
+ * <p>This class multiplies writers per subtask using {@code replicaNum} to 
fill blocking write
+ * queues in {@link MultiTableSinkWriter}, improving throughput for 
multi-table workloads.
+ */
 public class MultiTableSink
         implements SeaTunnelSink<
                         SeaTunnelRow,
@@ -56,6 +64,16 @@ public class MultiTableSink
     @Getter private final Map<TablePath, SeaTunnelSink> sinks;
     private final int replicaNum;
 
+    /**
+     * Constructs a MultiTableSink from the given factory context.
+     *
+     * <p>The {@code sinks} map is populated directly from {@link
+     * MultiTableFactoryContext#getSinks()}, keyed by {@link TablePath}. The 
{@code replicaNum}
+     * controls how many writers are created per table per subtask. Each 
subtask creates {@code
+     * replicaNum} writers to fill the blocking queues in {@link 
MultiTableSinkWriter}.
+     *
+     * @param context the factory context containing per-table sinks and 
configuration options
+     */
     public MultiTableSink(MultiTableFactoryContext context) {
         this.sinks = context.getSinks();
         this.replicaNum =
@@ -67,6 +85,18 @@ public class MultiTableSink
         return "MultiTableSink";
     }
 
+    /**
+     * Creates a new {@link MultiTableSinkWriter} with freshly initialized 
per-table writers.
+     *
+     * <p>For each table and each replica, a writer is created with a computed 
index using the
+     * formula {@code index = subtaskIndex * replicaNum + i}. This scatters 
writers across the
+     * blocking queues inside {@link MultiTableSinkWriter}, ensuring even 
distribution of write
+     * load.
+     *
+     * @param context the sink writer context providing subtask index and 
parallelism info
+     * @return a new {@link MultiTableSinkWriter} wrapping all per-table 
writers
+     * @throws IOException if any per-table writer creation fails
+     */
     @Override
     public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> 
createWriter(
             SinkWriter.Context context) throws IOException {
@@ -86,6 +116,19 @@ public class MultiTableSink
         return new MultiTableSinkWriter(writers, replicaNum, 
sinkWritersContext);
     }
 
+    /**
+     * Restores a {@link MultiTableSinkWriter} from previously checkpointed 
states.
+     *
+     * <p>Checkpoint states are matched back to per-table writers using {@link 
SinkIdentifier}
+     * (composed of table identifier and computed index). If no matching state 
is found for a given
+     * table and replica, a fresh writer is created instead via {@link
+     * SeaTunnelSink#createWriter(SinkWriter.Context)}.
+     *
+     * @param context the sink writer context providing subtask index and 
parallelism info
+     * @param states the list of checkpoint states from a previous snapshot
+     * @return a restored {@link MultiTableSinkWriter} with per-table writers 
rebuilt from state
+     * @throws IOException if any per-table writer restoration fails
+     */
     @Override
     public SinkWriter<SeaTunnelRow, MultiTableCommitInfo, MultiTableState> 
restoreWriter(
             SinkWriter.Context context, List<MultiTableState> states) throws 
IOException {
@@ -126,6 +169,16 @@ public class MultiTableSink
         return Optional.of(new DefaultSerializer<>());
     }
 
+    /**
+     * Creates a {@link MultiTableSinkCommitter} that aggregates per-table 
{@link SinkCommitter}
+     * instances.
+     *
+     * <p>Iterates over all registered sinks and collects their committers. If 
none of the sub-sinks
+     * provide a committer, returns {@link Optional#empty()}.
+     *
+     * @return an optional containing the aggregated committer, or empty if no 
sub-sink has one
+     * @throws IOException if any per-table committer creation fails
+     */
     @Override
     public Optional<SinkCommitter<MultiTableCommitInfo>> createCommitter() 
throws IOException {
         Map<String, SinkCommitter<?>> committers = new HashMap<>();
@@ -148,6 +201,16 @@ public class MultiTableSink
         return Optional.of(new DefaultSerializer<>());
     }
 
+    /**
+     * Creates a {@link MultiTableSinkAggregatedCommitter} that aggregates 
per-table {@link
+     * SinkAggregatedCommitter} instances across all sub-sinks.
+     *
+     * <p>If none of the sub-sinks provide an aggregated committer, returns 
{@link
+     * Optional#empty()}.
+     *
+     * @return an optional containing the aggregated committer, or empty if no 
sub-sink has one
+     * @throws IOException if any per-table aggregated committer creation fails
+     */
     @Override
     public Optional<SinkAggregatedCommitter<MultiTableCommitInfo, 
MultiTableAggregatedCommitInfo>>
             createAggregatedCommitter() throws IOException {
@@ -165,6 +228,15 @@ public class MultiTableSink
         return Optional.of(new 
MultiTableSinkAggregatedCommitter(aggCommitters));
     }
 
+    /**
+     * Returns the list of {@link TablePath}s for all tables managed by this 
sink.
+     *
+     * <p>For each sub-sink, tries {@link 
SeaTunnelSink#getWriteCatalogTable()} first to extract the
+     * table path from the catalog table. If that is not present, falls back 
to using the {@link
+     * TablePath} key from the original sinks map.
+     *
+     * @return the list of table paths for all managed tables
+     */
     public List<TablePath> getSinkTables() {
 
         List<TablePath> tablePaths = new ArrayList<>();
@@ -191,11 +263,31 @@ public class MultiTableSink
         sinks.values().forEach(sink -> sink.setJobContext(jobContext));
     }
 
+    /**
+     * Always returns empty in multi-table context.
+     *
+     * <p>In a multi-table sink, catalog tables are managed individually by 
each sub-sink rather
+     * than at the top level. This method delegates to the parent interface 
default, which returns
+     * {@link Optional#empty()}.
+     *
+     * @return {@link Optional#empty()}, always
+     */
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
         return SeaTunnelSink.super.getWriteCatalogTable();
     }
 
+    /**
+     * Delegates schema evolution support to the first sub-sink.
+     *
+     * <p>Precondition: the sinks map must contain at least one entry.
+     *
+     * <p>If the first sub-sink implements {@link SupportSchemaEvolutionSink}, 
returns its supported
+     * {@link SchemaChangeType} list. Otherwise returns an empty list, 
indicating no schema
+     * evolution support.
+     *
+     * @return the list of supported schema change types, or empty if not 
supported
+     */
     @Override
     public List<SchemaChangeType> supports() {
         SeaTunnelSink firstSink = 
sinks.entrySet().iterator().next().getValue();

Reply via email to