This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aad10fcb [flink] Support pre create log source (#3414)
4aad10fcb is described below

commit 4aad10fcb51109e9087ff7153d7200bc62f69587
Author: GuojunLi <[email protected]>
AuthorDate: Thu May 30 15:02:48 2024 +0800

    [flink] Support pre create log source (#3414)
---
 .../java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java | 5 +++++
 .../main/java/org/apache/paimon/flink/log/LogSourceProvider.java   | 7 +++++++
 .../java/org/apache/paimon/flink/source/FlinkSourceBuilder.java    | 1 +
 3 files changed, 13 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
index c7d20deed..40d5f0638 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSourceProvider.java
@@ -88,6 +88,11 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
         this.timestampMills = timestampMills;
     }
 
+    @Override
+    public void preCreateSource() {
+        // nothing to do before log source creating
+    }
+
     @Override
     public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long> 
bucketOffsets) {
         switch (consistency) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
index 08a697dcf..5a780fb63 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogSourceProvider.java
@@ -40,4 +40,11 @@ public interface LogSourceProvider extends Serializable {
      * @param bucketOffsets optional, configure if you need to specify the 
startup offset.
      */
     Source<RowData, ?, ?> createSource(@Nullable Map<Integer, Long> 
bucketOffsets);
+
+    /**
+     * Do pre-operations before log {@link Source} creation if you need, like 
system properties
+     * setting before job submitting, for the log {@link Source} inside of 
{@link HybridSource} will
+     * be created during job running.
+     */
+    void preCreateSource();
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 2e69fdfa5..32e61903c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -262,6 +262,7 @@ public class FlinkSourceBuilder {
         StreamingReadMode streamingReadMode = CoreOptions.streamReadType(conf);
 
         if (logSourceProvider != null && streamingReadMode != FILE) {
+            logSourceProvider.preCreateSource();
             if (startupMode != StartupMode.LATEST_FULL) {
                 return toDataStream(logSourceProvider.createSource(null));
             } else {

Reply via email to