This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 64b4c8246f3e050f57df159adc9acf2f3979738a Author: Joao Boto <[email protected]> AuthorDate: Mon Jul 8 15:39:02 2024 +0200 [FLINK-35367] Reorganize sinks --- .../connector/jdbc/core/datastream/Jdbc.java} | 21 +++++++++--------- .../jdbc/{ => core/datastream}/sink/JdbcSink.java | 14 ++++++------ .../datastream}/sink/JdbcSinkBuilder.java | 2 +- .../datastream}/sink/committer/JdbcCommitable.java | 2 +- .../sink/committer/JdbcCommitableSerializer.java | 2 +- .../datastream}/sink/committer/JdbcCommitter.java | 4 ++-- .../datastream}/sink/writer/JdbcWriter.java | 4 ++-- .../datastream}/sink/writer/JdbcWriterState.java | 2 +- .../sink/writer/JdbcWriterStateSerializer.java | 2 +- .../datasource/transactions/xa/XaTransaction.java | 2 +- .../apache/flink/connector/jdbc/JdbcITCase.java | 22 ++++++++++++++++--- .../datastream}/sink/AtLeastOnceJdbcSinkTest.java | 2 +- .../datastream}/sink/BaseJdbcSinkTest.java | 6 +++--- .../datastream}/sink/ExactlyOnceJdbcSinkTest.java | 2 +- .../sink/writer/AlLeastOnceJdbcWriterTest.java | 4 ++-- .../sink/writer/BaseJdbcWriterTest.java | 10 ++++++--- .../sink/writer/ExactlyOnceJdbcWriterTest.java | 4 ++-- .../sink/writer/JdbcWriterStateSerializerTest.java | 7 ++++-- .../sink/writer/JdbcWriterStateTest.java | 7 ++++-- .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 25 ++++++++++++++++++++-- .../org/apache/flink/connector/jdbc/JdbcSink.java | 5 +++-- .../apache/flink/connector/jdbc/sink/JdbcSink.java | 14 ++++++------ .../flink/connector/jdbc/sink/JdbcSinkBuilder.java | 8 ++++++- 23 files changed, 113 insertions(+), 58 deletions(-) diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java similarity index 55% copy from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java index c0a31ea1..2e09c5f4 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream; -import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; -import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder; -/** Smoke tests for the {@link JdbcSink} and the underlying classes. */ -class ExactlyOnceJdbcSinkTest extends BaseJdbcSinkTest { +/** Facade to create JDBC stream sources and sinks. */ +@PublicEvolving +public class Jdbc { - @Override - protected <T> JdbcSink<T> finishSink(JdbcSinkBuilder<T> builder) { - return builder.withExecutionOptions( - JdbcExecutionOptions.builder().withMaxRetries(0).build()) - .buildExactlyOnce( - JdbcExactlyOnceOptions.defaults(), getMetadata().getXaSourceSupplier()); + /** Create a JDBC sink builder. */ + public static <IN> JdbcSinkBuilder<IN> sinkBuilder() { + return JdbcSink.builder(); } } diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java similarity index 87% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java index 986fcba1..8c0cf999 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; @@ -25,15 +25,15 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitableSerializer; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitter; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitableSerializer; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitter; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriter; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterStateSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java similarity index 98% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java index ede9a00e..12c5a3e7 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSinkBuilder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.base.DeliveryGuarantee; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java similarity index 93% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java index 0a04aab0..97594d5b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitable.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitable.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.committer; +package org.apache.flink.connector.jdbc.core.datastream.sink.committer; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java similarity index 94% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java index b950aced..56a4a21f 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitableSerializer.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitableSerializer.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.committer; +package org.apache.flink.connector.jdbc.core.datastream.sink.committer; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java similarity index 93% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java index 1efa8133..a4576b84 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/committer/JdbcCommitter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/committer/JdbcCommitter.java @@ -1,14 +1,14 @@ -package org.apache.flink.connector.jdbc.sink.committer; +package org.apache.flink.connector.jdbc.core.datastream.sink.committer; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider; import org.apache.flink.connector.jdbc.datasource.transactions.xa.XaTransaction; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState; import org.apache.flink.util.FlinkRuntimeException; import java.io.IOException; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java index 2e167ddb..be7aa49c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.sink2.Sink; @@ -24,6 +24,7 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; @@ -32,7 +33,6 @@ import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.Transac import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java index 7c39827e..c5dece75 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterState.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterState.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java similarity index 97% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java index 89ba5a82..b11f6b45 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializer.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializer.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java index 6fcc1eb6..48896842 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/datasource/transactions/xa/XaTransaction.java @@ -3,11 +3,11 @@ package org.apache.flink.connector.jdbc.datasource.transactions.xa; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState; import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException; import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.TransientXaException; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java index ead2c44f..3b4c60e5 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java @@ -19,8 +19,13 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; +import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; +import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.util.function.FunctionWithException; import org.junit.jupiter.api.Test; @@ -46,7 +51,7 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; -/** Smoke tests for the {@link JdbcSink} and the underlying classes. */ +/** Smoke tests for the {@link GenericJdbcSinkFunction} and the underlying classes. */ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { public static final JdbcStatementBuilder<TestEntry> TEST_ENTRY_JDBC_STATEMENT_BUILDER = @@ -69,7 +74,7 @@ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { env.setParallelism(1); env.fromElements(TEST_DATA) .addSink( - JdbcSink.sink( + sink( String.format(INSERT_TEMPLATE, INPUT_TABLE), TEST_ENTRY_JDBC_STATEMENT_BUILDER, new JdbcConnectionOptionsBuilder() @@ -100,7 +105,7 @@ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { return reused; }) .addSink( - JdbcSink.sink( + sink( JdbcTestFixture.INSERT_INTO_WORDS_TEMPLATE, (ps, e) -> { ps.setInt(1, counter.getAndIncrement()); @@ -172,4 +177,15 @@ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { T value = get.apply(rs); return rs.wasNull() ? null : value; } + + private <T> SinkFunction<T> sink( + String sql, + JdbcStatementBuilder<T> statementBuilder, + JdbcConnectionOptions connectionOptions) { + return new GenericJdbcSinkFunction<>( + new JdbcOutputFormat<>( + new SimpleJdbcConnectionProvider(connectionOptions), + JdbcExecutionOptions.defaults(), + () -> JdbcBatchStatementExecutor.simple(sql, statementBuilder))); + } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java similarity index 96% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java index e3a9d28d..7bf7afbc 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/AtLeastOnceJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/AtLeastOnceJdbcSinkTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java similarity index 96% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java index 7224647a..219b374c 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/BaseJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.connector.jdbc.derby.DerbyTestBase; @@ -37,8 +37,8 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; import static org.assertj.core.api.Assertions.assertThat; /** - * Smoke tests for the {@link org.apache.flink.connector.jdbc.sink.JdbcSink} and the underlying - * classes. + * Smoke tests for the {@link org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink} and the + * underlying classes. */ public abstract class BaseJdbcSinkTest implements DerbyTestBase { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java similarity index 95% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java index c0a31ea1..f117d0f6 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/ExactlyOnceJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/ExactlyOnceJdbcSinkTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.sink; +package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java similarity index 94% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java index b22b1b1f..59fb1e4a 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/AlLeastOnceJdbcWriterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/AlLeastOnceJdbcWriterTest.java @@ -1,11 +1,11 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.junit.jupiter.api.Test; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java similarity index 94% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java index 914a2fbf..cfcc2030 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/BaseJdbcWriterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.api.common.JobID; import org.apache.flink.api.connector.sink2.Sink; @@ -6,12 +6,12 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement; import org.apache.flink.connector.jdbc.derby.DerbyTestBase; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.apache.flink.util.StringUtils; @@ -29,7 +29,11 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doReturn; -/** Base smoke tests for the {@link JdbcWriter} and the underlying classes. */ +/** + * Base smoke tests for the {@link + * org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter} and the underlying + * classes. + */ abstract class BaseJdbcWriterTest implements DerbyTestBase { private static final String JOBID = "6b64d8a9a951e2e8767ae952ad951706"; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java similarity index 95% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java index ab0153d2..409879a2 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/ExactlyOnceJdbcWriterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/ExactlyOnceJdbcWriterTest.java @@ -1,11 +1,11 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.xa.SimpleXaConnectionProvider; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.junit.jupiter.api.Test; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java similarity index 85% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java index 7b8c8e91..8bd14f84 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateSerializerTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateSerializerTest.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.api.common.JobID; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; @@ -10,7 +10,10 @@ import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; -/** Smoke test for {@link JdbcWriterStateSerializer}. */ +/** + * Smoke test for {@link + * org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer}. + */ class JdbcWriterStateSerializerTest { @Test diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java similarity index 86% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java index f454f8be..889a7eed 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/sink/writer/JdbcWriterStateTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriterStateTest.java @@ -1,4 +1,4 @@ -package org.apache.flink.connector.jdbc.sink.writer; +package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.api.common.JobID; import org.apache.flink.connector.jdbc.datasource.transactions.xa.domain.TransactionId; @@ -9,7 +9,10 @@ import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; -/** Smoke test for {@link JdbcWriterState}. */ +/** + * Smoke test for {@link + * org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState}. + */ class JdbcWriterStateTest { @Test diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java index 85db5e37..eb8569db 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java @@ -25,7 +25,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.connector.jdbc.testutils.DatabaseTest; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; @@ -35,10 +35,12 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.SerializableSupplier; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -46,6 +48,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.sql.XADataSource; + import java.time.Duration; import java.util.Collections; import java.util.List; @@ -140,7 +144,7 @@ public abstract class JdbcExactlyOnceSinkE2eTest implements DatabaseTest { .setParallelism(PARALLELISM) .map(new FailingMapper(numElementsPerCheckpoint + (numElementsPerCheckpoint / 2))) .addSink( - JdbcSink.exactlyOnceSink( + exactlyOnceSink( OUTPUT_TABLE.getInsertIntoQuery(), OUTPUT_TABLE.getStatementBuilder(), JdbcExecutionOptions.builder().withMaxRetries(0).build(), @@ -166,6 +170,23 @@ public abstract class JdbcExactlyOnceSinkE2eTest implements DatabaseTest { System.currentTimeMillis() - started); } + private <T> SinkFunction<T> exactlyOnceSink( + String sql, + JdbcStatementBuilder<T> statementBuilder, + JdbcExecutionOptions executionOptions, + JdbcExactlyOnceOptions exactlyOnceOptions, + SerializableSupplier<XADataSource> dataSourceSupplier) { + return new JdbcXaSinkFunction<>( + sql, + statementBuilder, + XaFacade.fromXaDataSourceSupplier( + dataSourceSupplier, + exactlyOnceOptions.getTimeoutSec(), + exactlyOnceOptions.isTransactionPerConnection()), + executionOptions, + exactlyOnceOptions); + } + /** * {@link SourceFunction} emits {@link BooksTable.BookEntry test entries} and waits for the * checkpoint. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java similarity index 96% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java index efd976e2..a46601f6 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java @@ -18,11 +18,11 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; -import org.apache.flink.connector.jdbc.sink.JdbcSinkBuilder; import org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction; import org.apache.flink.connector.jdbc.xa.XaFacade; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -32,6 +32,7 @@ import javax.sql.XADataSource; /** Facade to create JDBC {@link SinkFunction sinks}. */ @PublicEvolving +@Deprecated public class JdbcSink { /** @@ -109,7 +110,7 @@ public class JdbcSink { } public static <IN> JdbcSinkBuilder<IN> builder() { - return org.apache.flink.connector.jdbc.sink.JdbcSink.builder(); + return org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink.builder(); } private JdbcSink() {} diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java similarity index 87% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java index 986fcba1..a1f8a4b4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSink.java @@ -25,15 +25,15 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitable; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitableSerializer; +import org.apache.flink.connector.jdbc.core.datastream.sink.committer.JdbcCommitter; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriter; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterState; +import org.apache.flink.connector.jdbc.core.datastream.sink.writer.JdbcWriterStateSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.statements.JdbcQueryStatement; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitable; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitableSerializer; -import org.apache.flink.connector.jdbc.sink.committer.JdbcCommitter; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriter; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterState; -import org.apache.flink.connector.jdbc.sink.writer.JdbcWriterStateSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import java.io.IOException; @@ -44,8 +44,10 @@ import java.util.Collections; * Flink Sink to produce data into a jdbc database. * * @see JdbcSinkBuilder on how to construct a JdbcSink + * @deprecated please use {@link org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink} */ @PublicEvolving +@Deprecated public class JdbcSink<IN> implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java similarity index 96% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java index ede9a00e..3b073568 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/sink/JdbcSinkBuilder.java @@ -36,8 +36,14 @@ import javax.sql.XADataSource; import static org.apache.flink.util.Preconditions.checkNotNull; -/** Builder to construct {@link JdbcSink}. */ +/** + * Builder to construct {@link JdbcSink}. + * + * @deprecated please use {@link + * org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder} + */ @PublicEvolving +@Deprecated public class JdbcSinkBuilder<IN> { private JdbcExecutionOptions executionOptions;
