This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch api-draft in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 5eb77fc3050bf6306f25843aba6a2c3020e014fa Author: Zongwen Li <[email protected]> AuthorDate: Thu Apr 28 10:29:44 2022 +0800 [Feature][core] base logic --- pom.xml | 1 + .../Serializer.java} | 26 ++-- .../java/org/apache/seatunnel/api/sink/Sink.java | 32 ++++- .../api/sink/SinkAggregatedCommitter.java | 10 +- .../apache/seatunnel/api/sink/SinkCommitter.java | 11 +- .../org/apache/seatunnel/api/sink/SinkWriter.java | 44 +++++- .../org/apache/seatunnel/api/source/Collector.java | 5 +- .../org/apache/seatunnel/api/source/Source.java | 18 ++- .../apache/seatunnel/api/source/SourceEvent.java | 7 +- .../apache/seatunnel/api/source/SourceReader.java | 41 +++++- .../apache/seatunnel/api/source/SourceSplit.java | 4 +- .../api/source/SourceSplitEnumerator.java | 68 +++++++++- .../{Collector.java => SupportCoordinate.java} | 5 +- .../seatunnel/api/state/CheckpointListener.java | 5 + .../seatunnel/api/state/StatefulOperator.java | 21 --- .../apache/seatunnel/api/table/catalog/Column.java | 37 +++-- .../api/table/catalog/TableIdentifier.java | 2 +- .../seatunnel/api/table/catalog/TableSchema.java | 4 +- .../table/catalog/exception/CatalogException.java | 14 +- .../table/connector/SupportReadingMetadata.java | 1 - .../seatunnel/api/table/connector/TableSink.java | 2 +- .../seatunnel/api/table/connector/TableSource.java | 2 +- .../api/table/factory/CatalogFactory.java | 11 +- .../factory/{CatalogFactory.java => Factory.java} | 11 +- ...TableFactoryUtil.java => FactoryException.java} | 9 +- .../seatunnel/api/table/factory/FactoryUtil.java | 150 +++++++++++++++++++++ .../api/table/factory/SupportMultipleTable.java | 2 +- ...{TableFactory.java => TableFactoryContext.java} | 51 ++++--- .../api/table/factory/TableSinkFactory.java | 4 +- .../api/table/factory/TableSourceFactory.java | 4 +- seatunnel-translation/pom.xml | 21 +++ .../seatunnel-translation-base/pom.xml | 26 ++++ .../serialization/RowSerialization.java | 26 ++-- .../translation/source/CoordinatedSource.java | 5 +- .../translation/source/ParallelSource.java | 4 +- .../seatunnel-translation-flink/pom.xml | 32 +++++ .../flink/serialization/FlinkRowSerialization.java | 19 +++ .../seatunnel-translation-spark/pom.xml | 24 ++++ 38 files changed, 630 insertions(+), 129 deletions(-) diff --git a/pom.xml b/pom.xml index 5804f998..ed98a286 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ <module>seatunnel-examples</module> <module>seatunnel-e2e</module> <module>seatunnel-api</module> + <module>seatunnel-translation</module> </modules> <properties> diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java similarity index 57% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java index 9742f1df..61703486 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/serialization/Serializer.java @@ -15,25 +15,27 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.factory; +package org.apache.seatunnel.api.serialization; -import org.apache.seatunnel.api.table.catalog.Catalog; +import java.io.IOException; -import java.util.Map; - -public interface CatalogFactory { +public interface Serializer<T> { /** - * Returns a unique identifier among same factory interfaces. + * Serializes the given object. * - * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code - * kafka}). If multiple factories exist for different versions, a version should be appended - * using "-" (e.g. {@code elasticsearch-7}). + * @param obj The object to serialize. + * @return The serialized data (bytes). + * @throws IOException Thrown, if the serialization fails. */ - String factoryIdentifier(); + byte[] serialize(T obj) throws IOException; /** - * Creates a {@link Catalog} using the options. + * De-serializes the given data (bytes). + * + * @param serialized The serialized data + * @return The deserialized object + * @throws IOException Thrown, if the deserialization fails. */ - Catalog createCatalog(String catalogName, Map<String, String> options); + T deserialize(byte[] serialized) throws IOException; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java index 9e1d73e6..2c1fedef 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/Sink.java @@ -17,8 +17,38 @@ package org.apache.seatunnel.api.sink; +import org.apache.seatunnel.api.serialization.Serializer; + +import java.io.IOException; import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +public interface Sink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> extends Serializable { + + SinkWriter<IN, StateT> createWriter(SinkWriter.Context context) throws IOException; + + default SinkWriter<IN, StateT> restoreWriter(SinkWriter.Context context, List<StateT> states) throws IOException { + return createWriter(context); + } + + default Optional<Serializer<StateT>> getWriterStateSerializer() { + return Optional.empty(); + } + + default Optional<SinkCommitter<CommitInfoT>> createCommitter() throws IOException { + return Optional.empty(); + } + + default Optional<Serializer<CommitInfoT>> getCommitInfoSerializer() { + return Optional.empty(); + } -public interface Sink<IN> extends Serializable { + default Optional<SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>> createAggregatedCommitter() throws IOException { + return Optional.empty(); + } + default Optional<Serializer<AggregatedCommitInfoT>> getAggregatedCommitInfoSerializer() { + return Optional.empty(); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java index e4947fcb..1f38efa3 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java @@ -17,5 +17,13 @@ package org.apache.seatunnel.api.sink; -public interface SinkAggregatedCommitter { +import java.io.IOException; +import java.util.List; + +public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> { + + List<AggregatedCommitInfoT> commit(List<AggregatedCommitInfoT> aggregatedCommitInfo) + throws IOException, InterruptedException; + + void abort() throws Exception; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java index 176cca65..2dff2525 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommitter.java @@ -17,5 +17,14 @@ package org.apache.seatunnel.api.sink; -public interface SinkCommitter { +import java.io.IOException; +import java.util.List; + +public interface SinkCommitter<CommitInfoT> { + + List<CommitInfoT> prepareCommit(boolean flush) throws IOException, InterruptedException; + + List<CommitInfoT> commit(List<CommitInfoT> committables) throws IOException, InterruptedException; + + void abort() throws Exception; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java index 563d9fa0..933bfeb0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java @@ -17,5 +17,47 @@ package org.apache.seatunnel.api.sink; -public interface SinkWriter { +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface SinkWriter<T, StateT> { + void write(T element) throws IOException, InterruptedException; + + /** + * @return The writer's state. + * @throws IOException if fail to snapshot writer's state. + * @deprecated implement {@link #snapshotState(long)} + */ + default List<StateT> snapshotState() throws IOException { + return Collections.emptyList(); + } + + /** + * @return The writer's state. + * @throws IOException if fail to snapshot writer's state. + */ + default List<StateT> snapshotState(long checkpointId) throws IOException { + return snapshotState(); + } + + interface Context { + + /** + * Gets the configuration with which Flink was started. + */ + Map<String, String> getConfiguration(); + + /** + * @return The index of this subtask. + */ + int getIndexOfSubtask(); + + /** + * @return The number of parallel Sink tasks. + */ + int getNumberOfParallelSubtasks(); + + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java index 5a84fbf7..f36a7f87 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java @@ -17,5 +17,8 @@ package org.apache.seatunnel.api.source; -public interface Collector { +public interface Collector<T> { + + void collect(T record); + } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java index 6dd2e74a..80cf9aaf 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Source.java @@ -17,17 +17,31 @@ package org.apache.seatunnel.api.source; +import org.apache.seatunnel.api.serialization.Serializer; + import java.io.Serializable; /** * The interface for Source. It acts like a factory class that helps construct the {@link * SourceSplitEnumerator} and {@link SourceReader} and corresponding serializers. * - * @param <T> The type of records produced by the source. + * @param <T> The type of records produced by the source. * @param <SplitT> The type of splits handled by the source. - * @param <StateT> The type of state to store. */ public interface Source<T, SplitT extends SourceSplit, StateT> extends Serializable { + /** + * Get the boundedness of this source. + * + * @return the boundedness of this source. + */ + Boundedness getBoundedness(); + + SourceReader<T, SplitT> createReader(SourceReader.Context readerContext) throws Exception; + + SourceSplitEnumerator<SplitT, StateT> createEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext) throws Exception; + + SourceSplitEnumerator<SplitT, StateT> restoreEnumerator(SourceSplitEnumerator.Context<SplitT> enumeratorContext, StateT checkpointState) throws Exception; + Serializer<StateT> getEnumeratorStateSerializer(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java index e1255b95..4d2374b4 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceEvent.java @@ -17,5 +17,10 @@ package org.apache.seatunnel.api.source; -public interface SourceEvent { +import java.io.Serializable; + +/** + * An base class for the events passed between the SourceReaders and Enumerators. + */ +public interface SourceEvent extends Serializable { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java index 7e784f17..dff0527d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceReader.java @@ -17,9 +17,46 @@ package org.apache.seatunnel.api.source; -public interface SourceReader { +import org.apache.seatunnel.api.state.CheckpointListener; - interface SupportCoordinate { +import java.util.List; +import java.util.Map; +public interface SourceReader<T, SplitT extends SourceSplit> extends CheckpointListener { + + void start(Collector<T> output) throws Exception; + + List<SplitT> snapshotState(long checkpointId); + + void addSplits(List<SplitT> splits); + + default void handleSourceEvent(SourceEvent sourceEvent) { + } + + interface Context { + + /** + * Gets the configuration with which Flink was started. + */ + Map<String, String> getConfiguration(); + + /** + * @return The index of this subtask. + */ + int getIndexOfSubtask(); + + /** + * Sends a split request to the source's {@link SourceSplitEnumerator}. This will result in a call to + * the {@link SourceSplitEnumerator#handleSplitRequest(int, String)} method, with this reader's + * parallel subtask id and the hostname where this reader runs. + */ + void sendSplitRequest(); + + /** + * Send a source event to the source coordinator. + * + * @param sourceEvent the source event to coordinator. + */ + void sendSourceEventToCoordinator(SourceEvent sourceEvent); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java index e7b6b9b1..af6e1acf 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplit.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.api.source; -/** An interface for all the Split types to extend. */ +/** + * An interface for all the Split types to extend. + */ public interface SourceSplit { /** diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java index 0dff0b1d..1c2291fa 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java @@ -17,9 +17,73 @@ package org.apache.seatunnel.api.source; -public interface SourceSplitEnumerator { +import org.apache.seatunnel.api.state.CheckpointListener; - interface SupportCoordinate { +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT> extends CheckpointListener { + + void handleSplitRequest(int subtaskId, String requesterHostname); + + void registerReader(int subtaskId); + + StateT snapshotState(long checkpointId) throws Exception; + + default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } + + interface Context<SplitT extends SourceSplit> { + + int currentParallelism(); + + /** + * Get the currently registered readers. The mapping is from subtask id to the reader info. + * + * @return the currently registered readers. + */ + Set<Integer> registeredReaders(); + + /** + * Assign the splits. + * + * @param newSplitAssignments the new split assignments to add. + */ + void assignSplits(Map<Integer, List<SplitT>> newSplitAssignments); + + /** + * Assigns a single split. + * + * <p>When assigning multiple splits, it is more efficient to assign all of them in a single + * call to the {@link #assignSplits} method. + * + * @param split The new split + * @param subtask The index of the operator's parallel subtask that shall receive the split. + */ + default void assignSplit(SplitT split, int subtask) { + Map<Integer, List<SplitT>> splits = new HashMap<>(); + splits.put(subtask, Collections.singletonList(split)); + assignSplits(splits); + } + + /** + * Signals a subtask that it will not receive any further split. + * + * @param subtask The index of the operator's parallel subtask that shall be signaled it will + * not receive any further split. + */ + void signalNoMoreSplits(int subtask); + + /** + * Send a source event to a source reader. The source reader is identified by its subtask id. + * + * @param subtaskId the subtask id of the source reader to send this event to. + * @param event the source event to send. + */ + void sendEventToSourceReader(int subtaskId, SourceEvent event); + } + } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java similarity index 88% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java index 5a84fbf7..2de9a3af 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SupportCoordinate.java @@ -17,5 +17,8 @@ package org.apache.seatunnel.api.source; -public interface Collector { +/** + * Used to mark whether the interface supports coordination. + */ +public interface SupportCoordinate { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java index a48d256c..8abf1398 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java @@ -18,4 +18,9 @@ package org.apache.seatunnel.api.state; public interface CheckpointListener { + + void notifyCheckpointComplete(long checkpointId) throws Exception; + + default void notifyCheckpointAborted(long checkpointId) throws Exception { + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java deleted file mode 100644 index fb09a761..00000000 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/state/StatefulOperator.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.api.state; - -public interface StatefulOperator { -} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java index 7e6a72a3..a8211f3e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java @@ -17,6 +17,7 @@ */ package org.apache.seatunnel.api.table.catalog; + import org.apache.seatunnel.api.table.type.DataType; import java.util.Objects; @@ -36,7 +37,9 @@ public abstract class Column { this.comment = comment; } - /** Creates a regular table column that represents physical data. */ + /** + * Creates a regular table column that represents physical data. + */ public static PhysicalColumn physical(String name, DataType dataType) { return new PhysicalColumn(name, dataType); } @@ -52,8 +55,10 @@ public abstract class Column { return new MetadataColumn(name, dataType, metadataKey); } - /** Add the comment to the column and return the new object. */ - public abstract Column withComment( String comment); + /** + * Add the comment to the column and return the new object. + */ + public abstract Column withComment(String comment); /** * Returns whether the given column is a physical column of a table; neither computed nor @@ -61,22 +66,30 @@ public abstract class Column { */ public abstract boolean isPhysical(); - /** Returns the data type of this column. */ + /** + * Returns the data type of this column. + */ public DataType getDataType() { return this.dataType; } - /** Returns the name of this column. */ + /** + * Returns the name of this column. + */ public String getName() { return name; } - /** Returns the comment of this column. */ + /** + * Returns the comment of this column. + */ public Optional<String> getComment() { return Optional.ofNullable(comment); } - /** Returns a copy of the column with a replaced {@link DataType}. */ + /** + * Returns a copy of the column with a replaced {@link DataType}. + */ public abstract Column copy(DataType newType); @Override @@ -102,7 +115,9 @@ public abstract class Column { // Specific kinds of columns // -------------------------------------------------------------------------------------------- - /** Representation of a physical column. */ + /** + * Representation of a physical column. + */ public static final class PhysicalColumn extends Column { private PhysicalColumn(String name, DataType dataType) { @@ -132,7 +147,9 @@ public abstract class Column { } } - /** Representation of a metadata column. */ + /** + * Representation of a metadata column. + */ public static final class MetadataColumn extends Column { private final String metadataKey; @@ -156,7 +173,7 @@ public abstract class Column { } @Override - public MetadataColumn withComment( String comment) { + public MetadataColumn withComment(String comment) { if (comment == null) { return this; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java index da7b02f3..e3c60ada 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java @@ -22,7 +22,7 @@ import java.util.Objects; public final class TableIdentifier implements Serializable { private static final long serialVersionUID = 1L; - + private final String catalogName; private final String databaseName; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java index ec47b0ff..ac46fd8a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java @@ -32,7 +32,9 @@ public final class TableSchema implements Serializable { return new TableSchema(columns); } - /** Returns all {@link Column}s of this schema. */ + /** + * Returns all {@link Column}s of this schema. + */ public List<Column> getColumns() { return columns; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java index 31b526bc..d91e6670 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/exception/CatalogException.java @@ -17,22 +17,28 @@ package org.apache.seatunnel.api.table.catalog.exception; -/** A catalog-related, runtime exception. */ +/** + * A catalog-related, runtime exception. + */ public class CatalogException extends RuntimeException { - /** @param message the detail message. */ + /** + * @param message the detail message. + */ public CatalogException(String message) { super(message); } - /** @param cause the cause. */ + /** + * @param cause the cause. + */ public CatalogException(Throwable cause) { super(cause); } /** * @param message the detail message. - * @param cause the cause. + * @param cause the cause. */ public CatalogException(String message, Throwable cause) { super(message, cause); diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java index aef9ba52..3fc90e17 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/SupportReadingMetadata.java @@ -25,7 +25,6 @@ import java.util.Map; /** * Used for {@link TableSource} to support metadata columns. - * */ public interface SupportReadingMetadata { diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java index a7a9bf4e..fe0be552 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSink.java @@ -21,5 +21,5 @@ import org.apache.seatunnel.api.sink.Sink; public interface TableSink { - Sink<?> createSink(); + Sink<?, ?, ?, ?> createSink(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java index a1290f81..2267ab1f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/connector/TableSource.java @@ -21,5 +21,5 @@ import org.apache.seatunnel.api.source.Source; public interface TableSource { - Source<?,?,?> createSource(); + Source<?, ?, ?> createSource(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java index 9742f1df..2448c77e 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java @@ -21,16 +21,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog; import java.util.Map; -public interface CatalogFactory { - - /** - * Returns a unique identifier among same factory interfaces. - * - * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code - * kafka}). If multiple factories exist for different versions, a version should be appended - * using "-" (e.g. {@code elasticsearch-7}). - */ - String factoryIdentifier(); +public interface CatalogFactory extends Factory { /** * Creates a {@link Catalog} using the options. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java similarity index 82% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java index 9742f1df..59b0fd51 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/Factory.java @@ -17,11 +17,7 @@ package org.apache.seatunnel.api.table.factory; -import org.apache.seatunnel.api.table.catalog.Catalog; - -import java.util.Map; - -public interface CatalogFactory { +public interface Factory { /** * Returns a unique identifier among same factory interfaces. @@ -31,9 +27,4 @@ public interface CatalogFactory { * using "-" (e.g. {@code elasticsearch-7}). */ String factoryIdentifier(); - - /** - * Creates a {@link Catalog} using the options. - */ - Catalog createCatalog(String catalogName, Map<String, String> options); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java similarity index 78% rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java index 67fcb835..64e5214c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryException.java @@ -17,6 +17,13 @@ package org.apache.seatunnel.api.table.factory; -public final class TableFactoryUtil { +public class FactoryException extends RuntimeException { + public FactoryException(String message, Throwable cause) { + super(message, cause); + } + + public FactoryException(String message) { + super(message); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java new file mode 100644 index 00000000..2b7ba445 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.factory; + +import org.apache.seatunnel.api.sink.Sink; +import org.apache.seatunnel.api.source.Source; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +/** + * Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link CatalogFactory}. + */ +public final class FactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + + public static List<Source> createAndPrepareSource( + List<CatalogTable> multipleTables, + Map<String, String> options, + ClassLoader classLoader, + String factoryIdentifier) { + + + try { + final TableSourceFactory factory = discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier); + List<Source> sources = new ArrayList<>(multipleTables.size()); + if (factory instanceof SupportMultipleTable) { + TableFactoryContext context = new TableFactoryContext(multipleTables, options, classLoader); + SupportMultipleTable multipleTableSourceFactory = ((SupportMultipleTable) factory); + // TODO: create all source + SupportMultipleTable.Result result = multipleTableSourceFactory.applyTables(context); + TableSource multipleTableSource = factory.createSource(new TableFactoryContext(result.getAcceptedTables(), options, classLoader)); + // TODO: handle reading metadata + Source<?, ?, ?> source = multipleTableSource.createSource(); + sources.add(source); + } + return sources; + } catch (Throwable t) { + throw new FactoryException( + String.format( + "Unable to create a source for identifier '%s'.", factoryIdentifier), + t); + } + } + + public static List<Sink> createAndPrepareSink() { + return null; + } + + public static Catalog createCatalog(String catalogName, + Map<String, String> options, + ClassLoader classLoader, + String factoryIdentifier) { + CatalogFactory catalogFactory = discoverFactory(classLoader, CatalogFactory.class, factoryIdentifier); + return catalogFactory.createCatalog(catalogName, options); + } + + @SuppressWarnings("unchecked") + public static <T extends Factory> T discoverFactory( + ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) { + final List<Factory> factories = discoverFactories(classLoader); + + final List<Factory> foundFactories = + factories.stream() + .filter(f -> factoryClass.isAssignableFrom(f.getClass())) + .collect(Collectors.toList()); + + if (foundFactories.isEmpty()) { + throw new FactoryException( + String.format( + "Could not find any factories that implement '%s' in the classpath.", + factoryClass.getName())); + } + + final List<Factory> matchingFactories = + foundFactories.stream() + .filter(f -> f.factoryIdentifier().equals(factoryIdentifier)) + .collect(Collectors.toList()); + + if (matchingFactories.isEmpty()) { + throw new FactoryException( + String.format( + "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + foundFactories.stream() + .map(Factory::factoryIdentifier) + .distinct() + .sorted() + .collect(Collectors.joining("\n")))); + } + + if (matchingFactories.size() > 1) { + throw new FactoryException( + String.format( + "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" + + "Ambiguous factory classes are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + matchingFactories.stream() + .map(f -> f.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return (T) matchingFactories.get(0); + } + + private static List<Factory> discoverFactories(ClassLoader classLoader) { + try { + final List<Factory> result = new LinkedList<>(); + ServiceLoader.load(Factory.class, classLoader) + .iterator() + .forEachRemaining(result::add); + return result; + } catch (ServiceConfigurationError e) { + LOG.error("Could not load service provider for factories.", e); + throw new FactoryException("Could not load service provider for factories.", e); + } + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java index 7c34c354..6b15786a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java @@ -30,7 +30,7 @@ public interface SupportMultipleTable { /** * A connector can pick tables and return the accepted and remaining tables. */ - Result applyTables(TableFactory.Context context); + Result applyTables(TableFactoryContext context); final class Result { private final List<CatalogTable> acceptedTables; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java similarity index 51% rename from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java index b531e71e..6caf0b89 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java @@ -22,31 +22,38 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import java.util.List; import java.util.Map; -public interface TableFactory { - /** - * Returns a unique identifier among same factory interfaces. - * - * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code - * kafka}). If multiple factories exist for different versions, a version should be appended - * using "-" (e.g. {@code elasticsearch-7}). - */ - String factoryIdentifier(); +public class TableFactoryContext { + + private final List<CatalogTable> catalogTables; + private final Map<String, String> options; + private final ClassLoader classLoader; + + public TableFactoryContext( + List<CatalogTable> catalogTables, + Map<String, String> options, + ClassLoader classLoader) { + this.catalogTables = catalogTables; + this.options = options; + this.classLoader = classLoader; + } - /** Provides information describing the multi-table to be accessed. */ - interface Context { - ClassLoader getClassLoader(); + public ClassLoader getClassLoader() { + return this.classLoader; + } - /** - * Returns a list of tables that need to be processed. - * - * <p> By default, return only single table. - * - * <p> If you need multiple tables, implement {@link SupportMultipleTable}. - */ - List<CatalogTable> getCatalogTable(); + /** + * Returns a list of tables that need to be processed. + * + * <p> By default, return only single table. + * + * <p> If you need multiple tables, implement {@link SupportMultipleTable}. + */ + public List<CatalogTable> getCatalogTables() { + return catalogTables; + } - /** Gives read-only access to the configuration of the current session. */ - Map<String, String> getOptions(); + public Map<String, String> getOptions() { + return this.options; } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java index 33319659..bb92a76a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java @@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.table.connector.TableSink; -public interface TableSinkFactory extends TableFactory { +public interface TableSinkFactory extends Factory { - TableSink createSink(TableFactory.Context context); + TableSink createSink(TableFactoryContext context); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java index 8318ab3a..241deeb0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java @@ -19,7 +19,7 @@ package org.apache.seatunnel.api.table.factory; import org.apache.seatunnel.api.table.connector.TableSource; -public interface TableSourceFactory extends TableFactory { +public interface TableSourceFactory extends Factory { - TableSource createSource(TableFactory.Context context); + TableSource createSource(TableFactoryContext context); } diff --git a/seatunnel-translation/pom.xml b/seatunnel-translation/pom.xml new file mode 100644 index 00000000..3a44957b --- /dev/null +++ b/seatunnel-translation/pom.xml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>seatunnel</artifactId> + <groupId>org.apache.seatunnel</groupId> + <version>2.0.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>seatunnel-translation</artifactId> + <packaging>pom</packaging> + + <modules> + <module>seatunnel-translation-base</module> + <module>seatunnel-translation-flink</module> + <module>seatunnel-translation-spark</module> + </modules> + +</project> \ No newline at end of file diff --git a/seatunnel-translation/seatunnel-translation-base/pom.xml b/seatunnel-translation/seatunnel-translation-base/pom.xml new file mode 100644 index 00000000..f9bbaff4 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-base/pom.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>seatunnel</artifactId> + <groupId>org.apache.seatunnel</groupId> + <version>2.0.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>seatunnel-translation-base</artifactId> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-api</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java similarity index 55% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java index 9742f1df..06bd52e9 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/CatalogFactory.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowSerialization.java @@ -15,25 +15,29 @@ * limitations under the License. */ -package org.apache.seatunnel.api.table.factory; +package org.apache.seatunnel.translation.serialization; -import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.type.Row; -import java.util.Map; +import java.io.IOException; -public interface CatalogFactory { +public interface RowSerialization<T> { /** - * Returns a unique identifier among same factory interfaces. + * Serializes the given object. * - * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code - * kafka}). If multiple factories exist for different versions, a version should be appended - * using "-" (e.g. {@code elasticsearch-7}). + * @param seaTunnelRow The object to serialize. + * @return The serialized data (bytes). + * @throws IOException Thrown, if the serialization fails. */ - String factoryIdentifier(); + T serialize(Row seaTunnelRow) throws IOException; /** - * Creates a {@link Catalog} using the options. + * De-serializes the given data (bytes). + * + * @param engineRow The internal engine row + * @return The SeaTunnel Row + * @throws IOException Thrown, if the deserialization fails. */ - Catalog createCatalog(String catalogName, Map<String, String> options); + Row deserialize(T engineRow) throws IOException; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java similarity index 90% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java index 563d9fa0..2816df97 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java @@ -15,7 +15,8 @@ * limitations under the License. */ -package org.apache.seatunnel.api.sink; +package org.apache.seatunnel.translation.source; + +public class CoordinatedSource { -public interface SinkWriter { } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java similarity index 91% copy from seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java copy to seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java index 563d9fa0..975fbfa7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.api.sink; +package org.apache.seatunnel.translation.source; -public interface SinkWriter { +public class ParallelSource { } diff --git a/seatunnel-translation/seatunnel-translation-flink/pom.xml b/seatunnel-translation/seatunnel-translation-flink/pom.xml new file mode 100644 index 00000000..7c8c3900 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/pom.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>seatunnel</artifactId> + <groupId>org.apache.seatunnel</groupId> + <version>2.0.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>seatunnel-translation-flink</artifactId> + + <properties> + <flink.version>1.13.6</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-translation-base</artifactId> + <version>${project.version}</version> + </dependency> + <!-- apache flink table --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java new file mode 100644 index 00000000..b45d100d --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowSerialization.java @@ -0,0 +1,19 @@ +package org.apache.seatunnel.translation.flink.serialization; + +import org.apache.flink.types.Row; +import org.apache.seatunnel.translation.serialization.RowSerialization; + +import java.io.IOException; + +public class FlinkRowSerialization implements RowSerialization<Row> { + + @Override + public Row serialize(org.apache.seatunnel.api.table.type.Row seaTunnelRow) throws IOException { + return null; + } + + @Override + public org.apache.seatunnel.api.table.type.Row deserialize(Row engineRow) throws IOException { + return null; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/pom.xml b/seatunnel-translation/seatunnel-translation-spark/pom.xml new file mode 100644 index 00000000..c381244a --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/pom.xml @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>seatunnel</artifactId> + <groupId>org.apache.seatunnel</groupId> + <version>2.0.5-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>seatunnel-translation-spark</artifactId> + + <properties> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-translation-base</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file
