This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 231542bbdbe96ecd3bfcc3f6c2f4c8aa2bc56306 Author: Joao Boto <[email protected]> AuthorDate: Mon Jul 8 16:14:38 2024 +0200 [FLINK-35494] Reorganize sources --- .../flink/connector/jdbc/JdbcInputFormat.java | 7 +++-- .../flink/connector/jdbc/core/datastream/Jdbc.java | 7 +++++ .../{ => core/datastream}/source/JdbcSource.java | 20 ++++++------- .../datastream}/source/JdbcSourceBuilder.java | 29 ++++++++---------- .../datastream}/source/JdbcSourceOptions.java | 2 +- .../enumerator/JdbcSourceEnumStateSerializer.java | 6 ++-- .../source/enumerator/JdbcSourceEnumerator.java | 4 +-- .../enumerator/JdbcSourceEnumeratorState.java | 4 +-- .../enumerator/JdbcSqlSplitEnumeratorBase.java | 4 +-- .../enumerator/SqlTemplateSplitEnumerator.java | 6 ++-- .../source/reader/JdbcRecordEmitter.java | 6 ++-- .../source/reader/JdbcSourceReader.java | 6 ++-- .../source/reader/JdbcSourceSplitReader.java | 16 +++++----- .../datastream}/source/reader/RecordAndOffset.java | 4 +-- .../source/reader/extractor/ResultExtractor.java | 2 +- .../reader/extractor/RowResultExtractor.java | 2 +- .../source/split/CheckpointedOffset.java | 2 +- .../datastream}/source/split/JdbcSourceSplit.java | 2 +- .../source/split/JdbcSourceSplitSerializer.java | 2 +- .../source/split/JdbcSourceSplitState.java | 2 +- .../jdbc/table/RowDataResultExtractor.java | 2 +- .../reader/splitreader/TestingSplitsChange.java | 2 +- .../flink/connector/jdbc/JdbcDataTestBase.java | 2 +- .../datastream}/source/JdbcSourceBuilderTest.java | 12 ++++---- .../datastream}/source/JdbcSourceITCase.java | 7 +++-- .../source/JdbcSourceStreamRelatedITCase.java | 9 ++++-- .../JdbcSourceEnumStateSerializerTest.java | 13 +++++---- .../enumerator/JdbcSourceEnumeratorTest.java | 11 ++++--- .../source/reader/JdbcSourceReaderTest.java | 10 ++++--- .../source/reader/JdbcSourceSplitReaderTest.java | 11 ++++--- .../split/JdbcSourceSplitSerializerTest.java | 7 +++-- .../flink/connector/jdbc/source/JdbcSource.java | 25 +++++++++------- .../connector/jdbc/source/JdbcSourceBuilder.java | 33 ++++++++++----------- .../source/reader/extractor/ResultExtractor.java | 34 +++++----------------- 34 files changed, 159 insertions(+), 152 deletions(-) diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java index 20e1a2c7..dacf3c6c 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java @@ -27,6 +27,8 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; @@ -99,9 +101,8 @@ import java.util.Arrays; * @see JdbcParameterValuesProvider * @see PreparedStatement * @see DriverManager - * @deprecated Please use {@link org.apache.flink.connector.jdbc.source.JdbcSource} instead. The - * builder utils and parameters passing could be view {@link - * org.apache.flink.connector.jdbc.source.JdbcSourceBuilder}. + * @deprecated Please use {@link JdbcSource} instead. The builder utils and parameters passing could + * be view {@link JdbcSourceBuilder}. */ @Deprecated @Experimental diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java index 2e09c5f4..433a2626 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/Jdbc.java @@ -20,11 +20,18 @@ package org.apache.flink.connector.jdbc.core.datastream; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSinkBuilder; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder; /** Facade to create JDBC stream sources and sinks. */ @PublicEvolving public class Jdbc { + /** Create a JDBC source builder. */ + public static <OUT> JdbcSourceBuilder<OUT> sourceBuilder() { + return JdbcSource.builder(); + } + /** Create a JDBC sink builder. */ public static <IN> JdbcSinkBuilder<IN> sinkBuilder() { return JdbcSink.builder(); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java similarity index 89% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java index 08e4a777..eef58c07 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSource.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; @@ -30,16 +30,16 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; -import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; -import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java similarity index 91% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java index 8d52f7ac..451082a0 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilder.java @@ -16,17 +16,17 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; @@ -46,12 +46,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Objects; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; - /** * A tool is used to build {@link JdbcSource} quickly. * @@ -130,9 +124,9 @@ public class JdbcSourceBuilder<OUT> { JdbcSourceBuilder() { this.configuration = new Configuration(); this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); - this.splitReaderFetchBatchSize = READER_FETCH_BATCH_SIZE.defaultValue(); - this.resultSetType = RESULTSET_TYPE.defaultValue(); - this.resultSetConcurrency = RESULTSET_CONCURRENCY.defaultValue(); + this.splitReaderFetchBatchSize = JdbcSourceOptions.READER_FETCH_BATCH_SIZE.defaultValue(); + this.resultSetType = JdbcSourceOptions.RESULTSET_TYPE.defaultValue(); + this.resultSetConcurrency = JdbcSourceOptions.RESULTSET_CONCURRENCY.defaultValue(); this.deliveryGuarantee = DeliveryGuarantee.NONE; // Boolean to distinguish between default value and explicitly set autoCommit mode. this.autoCommit = true; @@ -275,7 +269,7 @@ public class JdbcSourceBuilder<OUT> { public JdbcSource<OUT> build() { this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); if (resultSetFetchSize > 0) { - this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize); + this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize); } if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { @@ -286,10 +280,11 @@ public class JdbcSourceBuilder<OUT> { DeliveryGuarantee.EXACTLY_ONCE); } - this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency); - this.configuration.set(RESULTSET_TYPE, resultSetType); - this.configuration.set(READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); - this.configuration.set(AUTO_COMMIT, autoCommit); + this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, resultSetConcurrency); + this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, resultSetType); + this.configuration.set( + JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); + this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit); Preconditions.checkState( !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty."); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java similarity index 97% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java index c83ad7b5..294be051 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceOptions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java similarity index 94% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java index 2d2283ac..2be1deb1 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializer.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializer.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java index a26905fc..9ebf389a 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java @@ -16,13 +16,13 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java similarity index 95% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java index 2cd3903c..1c7d5ad4 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorState.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorState.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java similarity index 95% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java index 25d09a71..dd3eb34b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSqlSplitEnumeratorBase.java @@ -16,10 +16,10 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import javax.annotation.Nonnull; import javax.annotation.Nullable; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java similarity index 95% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java index 2099d0e0..b4565d95 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/SqlTemplateSplitEnumerator.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java similarity index 86% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java index 803f7f3c..d707c68d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcRecordEmitter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcRecordEmitter.java @@ -16,12 +16,12 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitState; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitState; /** * The JDBC resorce emitter. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java similarity index 91% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java index 7d94d271..3a5edeb7 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReader.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReader.java @@ -16,14 +16,14 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitState; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitState; import java.util.Map; import java.util.function.Supplier; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java similarity index 94% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java index 5011f42b..7e1af42d 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -29,9 +29,9 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -51,11 +51,11 @@ import java.util.List; import java.util.Objects; import java.util.Queue; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_TYPE; /** * The JDBC source reader to read data from jdbc splits. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java similarity index 91% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java index eaf0981b..9b0d2861 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/RecordAndOffset.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/RecordAndOffset.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; -import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; /** * Util class to represent the record with the corresponding information. diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java similarity index 95% copy from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java copy to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java index 19bd6672..5a2a0e7b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/ResultExtractor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader.extractor; +package org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.types.Row; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java similarity index 94% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java index 6b0d414e..cc050acc 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/RowResultExtractor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/extractor/RowResultExtractor.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader.extractor; +package org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java similarity index 97% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java index e6fa7572..b49a2b22 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/CheckpointedOffset.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/CheckpointedOffset.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.split; +package org.apache.flink.connector.jdbc.core.datastream.source.split; import org.apache.flink.annotation.PublicEvolving; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java index 7a690ca9..f57169ba 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplit.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.split; +package org.apache.flink.connector.jdbc.core.datastream.source.split; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceSplit; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java index 3f6afb6e..86775fbf 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.split; +package org.apache.flink.connector.jdbc.core.datastream.source.split; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.InstantiationUtil; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java similarity index 98% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java rename to flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java index 2e3b2c3d..98d523b2 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitState.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.split; +package org.apache.flink.connector.jdbc.core.datastream.source.split; import org.apache.flink.util.FlinkRuntimeException; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java index 2bb65b80..534f4921 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/table/RowDataResultExtractor.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java index 97d2f7dd..962cdd81 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/base/source/reader/splitreader/TestingSplitsChange.java @@ -18,7 +18,7 @@ package org.apache.flink.connector.base.source.reader.splitreader; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import java.util.List; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java index d61100ef..808c7124 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTestBase.java @@ -17,8 +17,8 @@ package org.apache.flink.connector.jdbc; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.derby.testutils.DerbyMetadata; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java similarity index 93% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java index a760749a..36130033 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceBuilderTest.java @@ -16,12 +16,12 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; @@ -34,12 +34,12 @@ import org.junit.jupiter.api.Test; import java.io.Serializable; import java.time.Duration; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link JdbcSourceBuilder}. */ +/** Test for {@link org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder}. */ class JdbcSourceBuilderTest { private final String emptySql = ""; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java similarity index 97% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java index 845cfde5..2021094e 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -39,7 +39,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; import static org.assertj.core.api.Assertions.assertThat; -/** The Integration test for {@link JdbcSource}. */ +/** + * The Integration test for {@link + * org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource}. + */ class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase { public static Queue<TestEntry> collectedRecords; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java similarity index 98% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java index 2be8b404..445f6ba9 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source; +package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -27,8 +27,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.derby.DerbyTestBase; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; @@ -65,7 +65,10 @@ import java.util.function.Supplier; import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME; import static org.assertj.core.api.Assertions.assertThat; -/** Test for streaming semantic related cases of {@link JdbcSource}. */ +/** + * Test for streaming semantic related cases of {@link + * org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource}. + */ class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase { private static final long ONE_SECOND = Duration.ofSeconds(1L).toMillis(); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java similarity index 88% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java index ce7548d2..6bc9cee7 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumStateSerializerTest.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; -import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.junit.jupiter.api.Test; @@ -32,7 +32,10 @@ import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link JdbcSourceEnumStateSerializer}. */ +/** + * Test for {@link + * org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer}. + */ class JdbcSourceEnumStateSerializerTest { private final JdbcSourceEnumeratorState state = diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java similarity index 91% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java index 443fb05b..268fb73f 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.enumerator; +package org.apache.flink.connector.jdbc.core.datastream.source.enumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; import org.junit.jupiter.api.BeforeEach; @@ -38,7 +38,10 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link JdbcSourceEnumerator}. */ +/** + * Test for {@link + * org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator}. + */ class JdbcSourceEnumeratorTest { private static long splitId = 1L; diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java similarity index 90% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java index f549869f..ab6cc327 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceReaderTest.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcDataTestBase; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.junit.jupiter.api.Test; @@ -34,7 +34,9 @@ import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link JdbcSourceReader}. */ +/** + * Test for {@link org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader}. + */ class JdbcSourceReaderTest extends JdbcDataTestBase { @Test diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java similarity index 94% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java index 7ebe0e12..b28238f9 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.reader; +package org.apache.flink.connector.jdbc.core.datastream.source.reader; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -26,10 +26,10 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.TestingSplitsChange; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcDataTestBase; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; import org.junit.jupiter.api.Test; @@ -44,7 +44,10 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link JdbcSourceSplitReader}. */ +/** + * Test for {@link + * org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader}. + */ class JdbcSourceSplitReaderTest extends JdbcDataTestBase { private final JdbcSourceSplit split = diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java similarity index 94% rename from flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java rename to flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java index 9d3f6516..e414da6d 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/split/JdbcSourceSplitSerializerTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.jdbc.source.split; +package org.apache.flink.connector.jdbc.core.datastream.source.split; import org.junit.jupiter.api.Test; @@ -27,7 +27,10 @@ import java.util.Random; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link JdbcSourceSplitSerializer}. */ +/** + * Test for {@link + * org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer}. + */ class JdbcSourceSplitSerializerTest { private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", null, null); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java similarity index 88% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java index 08e4a777..19405624 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java @@ -30,16 +30,16 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; -import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; -import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; -import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; -import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; @@ -50,8 +50,13 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Objects; -/** JDBC source. */ +/** + * JDBC source. + * + * @deprecated please use {@link org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource} + */ @PublicEvolving +@Deprecated public class JdbcSource<OUT> implements Source<OUT, JdbcSourceSplit, JdbcSourceEnumeratorState>, ResultTypeQueryable<OUT> { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java similarity index 90% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java index 8d52f7ac..7e940e92 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java @@ -23,10 +23,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceOptions; +import org.apache.flink.connector.jdbc.core.datastream.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; -import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; @@ -46,12 +47,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Objects; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; -import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; - /** * A tool is used to build {@link JdbcSource} quickly. * @@ -94,8 +89,11 @@ import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET * @see PreparedStatement * @see DriverManager * @see JdbcSource + * @deprecated please use {@link + * org.apache.flink.connector.jdbc.core.datastream.source.JdbcSourceBuilder} */ @PublicEvolving +@Deprecated public class JdbcSourceBuilder<OUT> { public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class); @@ -130,9 +128,9 @@ public class JdbcSourceBuilder<OUT> { JdbcSourceBuilder() { this.configuration = new Configuration(); this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); - this.splitReaderFetchBatchSize = READER_FETCH_BATCH_SIZE.defaultValue(); - this.resultSetType = RESULTSET_TYPE.defaultValue(); - this.resultSetConcurrency = RESULTSET_CONCURRENCY.defaultValue(); + this.splitReaderFetchBatchSize = JdbcSourceOptions.READER_FETCH_BATCH_SIZE.defaultValue(); + this.resultSetType = JdbcSourceOptions.RESULTSET_TYPE.defaultValue(); + this.resultSetConcurrency = JdbcSourceOptions.RESULTSET_CONCURRENCY.defaultValue(); this.deliveryGuarantee = DeliveryGuarantee.NONE; // Boolean to distinguish between default value and explicitly set autoCommit mode. this.autoCommit = true; @@ -191,7 +189,7 @@ public class JdbcSourceBuilder<OUT> { /** * The continuousUnBoundingSettings to discovery the next available batch splits. Note: If the * value was set, the {@link #jdbcParameterValuesProvider} must specified with the {@link - * org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}. + * JdbcSlideTimingParameterProvider}. */ public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings( ContinuousUnBoundingSettings continuousUnBoundingSettings) { @@ -275,7 +273,7 @@ public class JdbcSourceBuilder<OUT> { public JdbcSource<OUT> build() { this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); if (resultSetFetchSize > 0) { - this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize); + this.configuration.set(JdbcSourceOptions.RESULTSET_FETCH_SIZE, resultSetFetchSize); } if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { @@ -286,10 +284,11 @@ public class JdbcSourceBuilder<OUT> { DeliveryGuarantee.EXACTLY_ONCE); } - this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency); - this.configuration.set(RESULTSET_TYPE, resultSetType); - this.configuration.set(READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); - this.configuration.set(AUTO_COMMIT, autoCommit); + this.configuration.set(JdbcSourceOptions.RESULTSET_CONCURRENCY, resultSetConcurrency); + this.configuration.set(JdbcSourceOptions.RESULTSET_TYPE, resultSetType); + this.configuration.set( + JdbcSourceOptions.READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); + this.configuration.set(JdbcSourceOptions.AUTO_COMMIT, autoCommit); Preconditions.checkState( !StringUtils.isNullOrWhitespaceOnly(sql), "'sql' mustn't be null or empty."); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java similarity index 57% rename from flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java rename to flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java index 19bd6672..79935080 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java @@ -19,39 +19,19 @@ package org.apache.flink.connector.jdbc.source.reader.extractor; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.types.Row; -import java.io.Serializable; import java.sql.ResultSet; -import java.sql.SQLException; /** * The Extractor to extract the data from {@link ResultSet}. * * @param <T> The target data type. + * @deprecated please use {@link + * org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor} */ @PublicEvolving -public interface ResultExtractor<T> extends Serializable { - - /** - * Extract the data from the current point line of the result. - * - * @param resultSet Result set queried from a sql. - * @return The data object filled by the current line of the resultSet. - * @throws SQLException SQL exception. - */ - T extract(ResultSet resultSet) throws SQLException; - - /** - * The identifier of the extractor. - * - * @return identifier in {@link String} type. - */ - default String identifier() { - return this.getClass().getSimpleName(); - } - - static ResultExtractor<Row> ofRowResultExtractor() { - return new RowResultExtractor(); - } -} +@Deprecated +public interface ResultExtractor<T> + extends org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor + .ResultExtractor< + T> {}
