This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 55162dcc5cc [FLINK-33262][table-api] Extend source provider interfaces
with the new parallelism provider interface
55162dcc5cc is described below
commit 55162dcc5cca6db6aeedddb30d80dd9f9b8d5202
Author: Zhanghao Chen <[email protected]>
AuthorDate: Sat Nov 4 21:00:25 2023 +0800
[FLINK-33262][table-api] Extend source provider interfaces with the new
parallelism provider interface
Close apache/flink#23663
---
.../connector/source/DataStreamScanProvider.java | 4 +++-
.../connector/source/SourceFunctionProvider.java | 21 ++++++++++++++++++++-
.../flink/table/connector/ParallelismProvider.java | 12 ++++++------
.../table/connector/source/InputFormatProvider.java | 21 ++++++++++++++++++++-
.../table/connector/source/SourceProvider.java | 18 +++++++++++++++++-
.../apache/flink/table/factories/FactoryUtil.java | 9 +++++++++
.../connectors/TransformationScanProvider.java | 4 +++-
7 files changed, 78 insertions(+), 11 deletions(-)
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
index 7fc4687363f..213e3806327 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.data.RowData;
@@ -35,7 +36,8 @@ import org.apache.flink.table.data.RowData;
* or {@link InputFormatProvider}.
*/
@PublicEvolving
-public interface DataStreamScanProvider extends
ScanTableSource.ScanRuntimeProvider {
+public interface DataStreamScanProvider
+ extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
/**
* Creates a scan Java {@link DataStream} from a {@link
StreamExecutionEnvironment}.
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
index ff7238d8a2f..e5c35525e16 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/SourceFunctionProvider.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.connector.source;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
/**
* Provider of a {@link SourceFunction} instance as a runtime implementation
for {@link
* ScanTableSource}.
@@ -32,10 +37,19 @@ import org.apache.flink.table.data.RowData;
*/
@Deprecated
@PublicEvolving
-public interface SourceFunctionProvider extends
ScanTableSource.ScanRuntimeProvider {
+public interface SourceFunctionProvider
+ extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
/** Helper method for creating a static provider. */
static SourceFunctionProvider of(SourceFunction<RowData> sourceFunction,
boolean isBounded) {
+ return of(sourceFunction, isBounded, null);
+ }
+
+ /** Helper method for creating a Source provider with a provided source
parallelism. */
+ static SourceFunctionProvider of(
+ SourceFunction<RowData> sourceFunction,
+ boolean isBounded,
+ @Nullable Integer sourceParallelism) {
return new SourceFunctionProvider() {
@Override
public SourceFunction<RowData> createSourceFunction() {
@@ -46,6 +60,11 @@ public interface SourceFunctionProvider extends
ScanTableSource.ScanRuntimeProvi
public boolean isBounded() {
return isBounded;
}
+
+ @Override
+ public Optional<Integer> getParallelism() {
+ return Optional.ofNullable(sourceParallelism);
+ }
};
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
index f9c4684383a..27e4047a016 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ParallelismProvider.java
@@ -20,14 +20,13 @@ package org.apache.flink.table.connector;
import org.apache.flink.annotation.PublicEvolving;
import
org.apache.flink.table.connector.sink.DynamicTableSink.SinkRuntimeProvider;
+import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import java.util.Optional;
/**
* Parallelism provider for other connector providers. It allows to express a
custom parallelism for
- * the connector runtime implementation. Otherwise the parallelism is
determined by the planner.
- *
- * <p>Note: Currently, this interface only works with {@link
SinkRuntimeProvider}.
+ * the connector runtime implementation. Otherwise, the parallelism is
determined by the planner.
*/
@PublicEvolving
public interface ParallelismProvider {
@@ -38,9 +37,10 @@ public interface ParallelismProvider {
* <p>The parallelism denotes how many parallel instances of a source or
sink will be spawned
* during the execution.
*
- * <p>Enforcing a different parallelism for sinks might mess up the
changelog if the input is
- * not {@link ChangelogMode#insertOnly()}. Therefore, a primary key is
required by which the
- * input will be shuffled before records enter the {@link
SinkRuntimeProvider} implementation.
+ * <p>Enforcing a different parallelism for sources/sinks might mess up
the changelog if the
+ * output/input is not {@link ChangelogMode#insertOnly()}. Therefore, a
primary key is required
+ * by which the output/input will be shuffled after/before records
leave/enter the {@link
+ * ScanRuntimeProvider}/{@link SinkRuntimeProvider} implementation.
*
* @return empty if the connector does not provide a custom parallelism,
then the planner will
* decide the number of parallel instances by itself.
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
index bbf20e1b193..a9775becc33 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/InputFormatProvider.java
@@ -20,18 +20,32 @@ package org.apache.flink.table.connector.source;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
/**
* Provider of an {@link InputFormat} instance as a runtime implementation for
{@link
* ScanTableSource}.
*/
@PublicEvolving
-public interface InputFormatProvider extends
ScanTableSource.ScanRuntimeProvider {
+public interface InputFormatProvider
+ extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
/** Helper method for creating a static provider. */
static InputFormatProvider of(InputFormat<RowData, ?> inputFormat) {
+ return of(inputFormat, null);
+ }
+
+ /** Helper method for creating a static provider with a provided source
parallelism. */
+ static InputFormatProvider of(
+ InputFormat<RowData, ?> inputFormat, @Nullable Integer
sourceParallelism) {
+
return new InputFormatProvider() {
+
@Override
public InputFormat<RowData, ?> createInputFormat() {
return inputFormat;
@@ -41,6 +55,11 @@ public interface InputFormatProvider extends
ScanTableSource.ScanRuntimeProvider
public boolean isBounded() {
return true;
}
+
+ @Override
+ public Optional<Integer> getParallelism() {
+ return Optional.ofNullable(sourceParallelism);
+ }
};
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
index f0e85f88624..2d9f5143626 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/SourceProvider.java
@@ -21,8 +21,13 @@ package org.apache.flink.table.connector.source;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.data.RowData;
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
/**
* Provider of a {@link Source} instance as a runtime implementation for
{@link ScanTableSource}.
*
@@ -30,11 +35,17 @@ import org.apache.flink.table.data.RowData;
* advanced connector developers.
*/
@PublicEvolving
-public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider {
+public interface SourceProvider extends ScanTableSource.ScanRuntimeProvider,
ParallelismProvider {
/** Helper method for creating a static provider. */
static SourceProvider of(Source<RowData, ?, ?> source) {
+ return of(source, null);
+ }
+
+ /** Helper method for creating a Source provider with a provided source
parallelism. */
+ static SourceProvider of(Source<RowData, ?, ?> source, @Nullable Integer
sourceParallelism) {
return new SourceProvider() {
+
@Override
public Source<RowData, ?, ?> createSource() {
return source;
@@ -44,6 +55,11 @@ public interface SourceProvider extends
ScanTableSource.ScanRuntimeProvider {
public boolean isBounded() {
return Boundedness.BOUNDED.equals(source.getBoundedness());
}
+
+ @Override
+ public Optional<Integer> getParallelism() {
+ return Optional.ofNullable(sourceParallelism);
+ }
};
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index 8f103be0c7e..d8d6d7e9000 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -120,6 +120,15 @@ public final class FactoryUtil {
.defaultValues("rest")
.withDescription("Specify the endpoints that are used.");
+ public static final ConfigOption<Integer> SOURCE_PARALLELISM =
+ ConfigOptions.key("scan.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines a custom parallelism for the source. "
+ + "By default, if this option is not
defined, the planner will derive the parallelism "
+ + "for each statement individually by also
considering the global configuration.");
+
public static final ConfigOption<WatermarkEmitStrategy>
WATERMARK_EMIT_STRATEGY =
ConfigOptions.key("scan.watermark.emit.strategy")
.enumType(WatermarkEmitStrategy.class)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
index e6642bc2ab1..46e739ec54d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/TransformationScanProvider.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.connectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -37,7 +38,8 @@ import org.apache.flink.table.data.RowData;
* SourceFunctionProvider}, or {@link SourceProvider}.
*/
@Internal
-public interface TransformationScanProvider extends
ScanTableSource.ScanRuntimeProvider {
+public interface TransformationScanProvider
+ extends ScanTableSource.ScanRuntimeProvider, ParallelismProvider {
/**
* Creates a {@link Transformation} instance.