This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4aad10fcb [flink] Support pre create log source (#3414)
4aad10fcb is described below
commit 4aad10fcb51109e9087ff7153d7200bc62f69587
Author: GuojunLi <[email protected]>
AuthorDate: Thu May 30 15:02:48 2024 +0800
[flink] Support pre create log source (#3414)
---
.../java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java | 5 +++++
.../main/java/org/apache/paimon/flink/log/LogSourceProvider.java | 7 +++++++
.../java/org/apache/paimon/flink/source/FlinkSourceBuilder.java | 1 +
3 files changed, 13 insertions(+)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
index c7d20deed..40d5f0638 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
@@ -88,6 +88,11 @@ public class KafkaLogSourceProvider implements
LogSourceProvider {
this.timestampMills = timestampMills;
}
+ @Override
+ public void preCreateSource() {
+ // nothing to do before log source creating
+ }
+
@Override
public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long>
bucketOffsets) {
switch (consistency) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
index 08a697dcf..5a780fb63 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
@@ -40,4 +40,11 @@ public interface LogSourceProvider extends Serializable {
* @param bucketOffsets optional, configure if you need to specify the
startup offset.
*/
Source<RowData, ?, ?> createSource(@Nullable Map<Integer, Long>
bucketOffsets);
+
+ /**
+ * Do pre-operations before log {@link Source} creation if you need, like
system properties
+ * setting before job submitting, for the log {@link Source} inside of
{@link HybridSource} will
+ * be created during job running.
+ */
+ void preCreateSource();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 2e69fdfa5..32e61903c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -262,6 +262,7 @@ public class FlinkSourceBuilder {
StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);
if (logSourceProvider != null && streamingReadMode != FILE) {
+ logSourceProvider.preCreateSource();
if (startupMode != StartupMode.LATEST_FULL) {
return toDataStream(logSourceProvider.createSource(null));
} else {